1use openmls::framing::MlsMessageOut;
8use openmls::prelude::{
9 tls_codec::Serialize as TlsSerialize, BasicCredential, Ciphersuite, CredentialWithKey,
10 KeyPackageBuilder,
11};
12use openmls_basic_credential::SignatureKeyPair;
13use openmls_traits::OpenMlsProvider;
14use parking_lot::RwLock;
15use ping_mls_store::{PersistentMlsProvider, StorageBackend};
16use std::collections::HashMap;
17use std::sync::Arc;
18use zeroize::Zeroizing;
19
20use crate::{
21 codec,
22 conversation::{Conversation, ConversationId, ConversationMeta},
23 device::{
24 CatchupAppEventEntry, CatchupConversationEntry, CatchupSnapshot, DeviceId, DeviceInfo,
25 LinkingTicket, LocalDevice, CATCHUP_SNAPSHOT_VERSION,
26 },
27 error::{Error, Result},
28 identity::{Identity, UserId},
29 message::{IncomingMessage, MessageEnvelope, MessageKind},
30 storage::Storage,
31 sync::SyncCursor,
32 transport::Transport,
33};
34
35const DEFAULT_CIPHERSUITE: Ciphersuite = Ciphersuite::MLS_128_DHKEMX25519_AES128GCM_SHA256_Ed25519;
36
37#[derive(Debug)]
38pub struct ClientConfig {
39 pub identity: Identity,
40 pub device_label: String,
41 pub storage: Arc<dyn Storage>,
42 pub transport: Arc<dyn Transport>,
43 pub now_ms: u64,
45 pub storage_backend: StorageBackend,
50}
51
52impl ClientConfig {
53 pub fn new_in_memory(
56 identity: Identity,
57 device_label: String,
58 storage: Arc<dyn Storage>,
59 transport: Arc<dyn Transport>,
60 now_ms: u64,
61 ) -> Self {
62 Self {
63 identity,
64 device_label,
65 storage,
66 transport,
67 now_ms,
68 storage_backend: StorageBackend::Memory,
69 }
70 }
71}
72
73pub struct MessagingClient {
74 pub(crate) identity: Identity,
75 pub(crate) local_device: LocalDevice,
76 pub(crate) crypto: Arc<PersistentMlsProvider>,
77 pub(crate) signing: Arc<SignatureKeyPair>,
78 pub(crate) storage: Arc<dyn Storage>,
79 pub(crate) transport: Arc<dyn Transport>,
80 conversations: RwLock<HashMap<ConversationId, Conversation>>,
81}
82
83impl std::fmt::Debug for MessagingClient {
84 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
85 f.debug_struct("MessagingClient")
86 .field("user_id", &self.identity.user_id().as_hex())
87 .field("device_id", &self.local_device.device_id.as_hex())
88 .field("conversation_count", &self.conversations.read().len())
89 .finish()
90 }
91}
92
93impl MessagingClient {
94 pub async fn init(cfg: ClientConfig) -> Result<Arc<Self>> {
96 let crypto = PersistentMlsProvider::open(cfg.storage_backend.clone())
101 .map_err(|e| Error::Storage(format!("provider open: {e}")))?;
102 let local_device = match cfg.storage.get("device", "local").await? {
103 Some(bytes) => decode_local_device(&bytes, cfg.identity.user_id().clone())?,
104 None => {
105 let dev = LocalDevice::generate(
106 cfg.identity.user_id().clone(),
107 cfg.device_label,
108 cfg.now_ms,
109 );
110 let bytes = encode_local_device(&dev)?;
111 cfg.storage.put("device", "local", bytes).await?;
112 dev
113 }
114 };
115
116 let signing = {
124 let sk_bytes = local_device.signing.to_bytes().to_vec();
125 let pk_bytes = local_device.signing.verifying_key().to_bytes().to_vec();
126 let kp = SignatureKeyPair::from_raw(
127 DEFAULT_CIPHERSUITE.signature_algorithm(),
128 sk_bytes,
129 pk_bytes,
130 );
131 kp.store(crypto.storage()).map_err(Error::mls)?;
132 Arc::new(kp)
133 };
134
135 let client = Arc::new(Self {
136 identity: cfg.identity,
137 local_device,
138 crypto,
139 signing,
140 storage: cfg.storage,
141 transport: cfg.transport,
142 conversations: RwLock::new(HashMap::new()),
143 });
144
145 client.rehydrate_conversations(cfg.now_ms).await?;
146 Ok(client)
147 }
148
149 pub fn user_id(&self) -> UserId {
150 self.identity.user_id().clone()
151 }
152 pub fn device_id(&self) -> DeviceId {
153 self.local_device.device_id.clone()
154 }
155 pub fn device_info(&self, now_ms: u64) -> DeviceInfo {
156 self.local_device.info(now_ms)
157 }
158
159 pub fn fresh_key_package(&self) -> Result<Vec<u8>> {
162 let credential_with_key = CredentialWithKey {
163 credential: BasicCredential::new(self.identity.user_id().0.clone()).into(),
164 signature_key: self.signing.public().to_vec().into(),
165 };
166 let bundle = KeyPackageBuilder::new()
167 .build(
168 DEFAULT_CIPHERSUITE,
169 self.crypto.as_ref(),
170 self.signing.as_ref(),
171 credential_with_key,
172 )
173 .map_err(Error::mls)?;
174 let msg: MlsMessageOut = bundle.key_package().clone().into();
176 msg.tls_serialize_detached().map_err(Error::mls)
177 }
178
179 pub async fn create_conversation(
181 self: &Arc<Self>,
182 name: Option<String>,
183 now_ms: u64,
184 ) -> Result<ConversationId> {
185 let id = ConversationId::new();
186 let convo = Conversation::create(
187 id,
188 name,
189 self.local_device.device_id.clone(),
190 self.identity.user_id(),
191 self.crypto.clone(),
192 self.signing.clone(),
193 self.storage.clone(),
194 now_ms,
195 )?;
196 convo.snapshot_to_storage().await?;
197 self.conversations.write().insert(id, convo);
198 Ok(id)
199 }
200
201 pub async fn join_conversation(
203 self: &Arc<Self>,
204 welcome_envelope: &MessageEnvelope,
205 now_ms: u64,
206 ) -> Result<ConversationId> {
207 if welcome_envelope.kind != MessageKind::Welcome {
208 return Err(Error::Invalid("expected Welcome envelope".into()));
209 }
210 let convo = Conversation::join(
211 &welcome_envelope.payload,
212 self.local_device.device_id.clone(),
213 self.crypto.clone(),
214 self.signing.clone(),
215 self.storage.clone(),
216 now_ms,
217 )?;
218 let id = convo.id();
219 convo.snapshot_to_storage().await?;
220 self.conversations.write().insert(id, convo);
221 Ok(id)
222 }
223
224 pub fn list_conversations(&self) -> Vec<ConversationMeta> {
225 self.conversations
226 .read()
227 .values()
228 .map(|c| c.meta.clone())
229 .collect()
230 }
231
232 pub async fn send(
234 &self,
235 conv_id: ConversationId,
236 plaintext: Vec<u8>,
237 now_ms: u64,
238 ) -> Result<MessageEnvelope> {
239 let envelope = {
240 let mut guard = self.conversations.write();
241 let convo = guard
242 .get_mut(&conv_id)
243 .ok_or_else(|| Error::UnknownConversation(conv_id.as_hex()))?;
244 convo.send_application(&plaintext, now_ms)?
245 };
246 self.transport.send(envelope.clone()).await?;
247 Ok(envelope)
248 }
249
250 #[allow(clippy::await_holding_lock)]
268 pub async fn add_members(
269 &self,
270 conv_id: ConversationId,
271 entries: Vec<(DeviceId, Vec<u8>)>,
272 now_ms: u64,
273 ) -> Result<()> {
274 let outcome = {
275 let mut guard = self.conversations.write();
276 let convo = guard
277 .get_mut(&conv_id)
278 .ok_or_else(|| Error::UnknownConversation(conv_id.as_hex()))?;
279 convo.add_members(entries, now_ms)?
280 };
281 self.transport.send(outcome.commit).await?;
282 self.transport.send(outcome.welcome).await?;
283 if let Some(c) = self.conversations.read().get(&conv_id) {
284 c.snapshot_to_storage().await?;
285 }
286 Ok(())
287 }
288
289 #[allow(clippy::await_holding_lock)] pub async fn remove_members(
291 &self,
292 conv_id: ConversationId,
293 leaf_indexes: Vec<u32>,
294 now_ms: u64,
295 ) -> Result<()> {
296 let envelope = {
297 let mut guard = self.conversations.write();
298 let convo = guard
299 .get_mut(&conv_id)
300 .ok_or_else(|| Error::UnknownConversation(conv_id.as_hex()))?;
301 convo.remove_members(leaf_indexes, now_ms)?
302 };
303 self.transport.send(envelope).await?;
304 if let Some(c) = self.conversations.read().get(&conv_id) {
305 c.snapshot_to_storage().await?;
306 }
307 Ok(())
308 }
309
310 #[allow(clippy::await_holding_lock)] pub async fn process_envelope(
314 &self,
315 env: &MessageEnvelope,
316 now_ms: u64,
317 ) -> Result<Option<IncomingMessage>> {
318 let mut guard = self.conversations.write();
321 let convo = match guard.get_mut(&env.conversation_id) {
322 Some(c) => c,
323 None => return Err(Error::UnknownConversation(env.conversation_id.as_hex())),
324 };
325 let out = convo.process(env, now_ms)?;
326 convo.snapshot_to_storage().await?;
328 Ok(out)
329 }
330
331 pub async fn sync_conversations(&self, now_ms: u64) -> Result<Vec<IncomingMessage>> {
334 let pending: Vec<(ConversationId, SyncCursor)> = self
335 .conversations
336 .read()
337 .iter()
338 .map(|(id, c)| (*id, c.cursor.clone()))
339 .collect();
340
341 let mut delivered = Vec::new();
342 for (conv_id, cursor) in pending {
343 loop {
344 let batch = self
345 .transport
346 .fetch_since(conv_id, cursor.clone(), 256)
347 .await?;
348 if batch.is_empty() {
349 break;
350 }
351 for env in &batch {
352 if let Some(msg) = self.process_envelope(env, now_ms).await? {
353 delivered.push(msg);
354 }
355 }
356 if batch.len() < 256 {
357 break;
358 } }
360 }
361 Ok(delivered)
362 }
363
364 async fn rehydrate_conversations(self: &Arc<Self>, now_ms: u64) -> Result<()> {
372 let metas = self.storage.list_keys("groups", "").await?;
373 for path in metas {
374 let Some((id_hex, suffix)) = path.split_once('/') else {
376 continue;
377 };
378 if suffix != "meta" {
379 continue;
380 }
381 let Some(meta_bytes) = self.storage.get("groups", &path).await? else {
382 continue;
383 };
384 let meta: ConversationMeta = match codec::decode(&meta_bytes) {
385 Ok(m) => m,
386 Err(_) => continue,
387 };
388 let cursor_bytes = self
389 .storage
390 .get("cursors", id_hex)
391 .await?
392 .unwrap_or_default();
393 let cursor = if cursor_bytes.is_empty() {
394 SyncCursor::default()
395 } else {
396 SyncCursor::decode(&cursor_bytes).unwrap_or_default()
397 };
398
399 let device_leaves_bytes = self
401 .storage
402 .get("device_leaves", id_hex)
403 .await?
404 .unwrap_or_default();
405 let device_leaves: std::collections::BTreeMap<DeviceId, u32> =
406 if device_leaves_bytes.is_empty() {
407 std::collections::BTreeMap::new()
408 } else {
409 let pairs: Vec<(DeviceId, u32)> =
410 codec::decode(&device_leaves_bytes).unwrap_or_default();
411 pairs.into_iter().collect()
412 };
413
414 match Conversation::load(
415 meta.id,
416 meta.clone(),
417 cursor,
418 device_leaves,
419 self.local_device.device_id.clone(),
420 self.crypto.clone(),
421 self.signing.clone(),
422 self.storage.clone(),
423 now_ms,
424 ) {
425 Ok(Some(convo)) => {
426 tracing::debug!(
427 target: "ping_core::client",
428 convo = %id_hex,
429 epoch = meta.epoch,
430 "rehydrated conversation from disk"
431 );
432 self.conversations.write().insert(meta.id, convo);
433 }
434 Ok(None) => {
435 tracing::warn!(
436 target: "ping_core::client",
437 convo = %id_hex,
438 "host-side meta present but OpenMLS state missing — skipping"
439 );
440 }
441 Err(e) => {
442 tracing::warn!(
443 target: "ping_core::client",
444 convo = %id_hex,
445 error = %e,
446 "Conversation::load failed — skipping"
447 );
448 }
449 }
450 }
451 Ok(())
452 }
453
454 pub async fn build_linking_ticket(
468 &self,
469 new_device_id: DeviceId,
470 new_device_kp: Vec<u8>,
471 last_app_events: Vec<(ConversationId, Vec<u8>)>,
472 now_ms: u64,
473 ) -> Result<LinkingTicket> {
474 let device_binding_sig = self.identity.sign_device_binding(&new_device_id.0);
475 let dg_id = device_group_id_for(self.identity.user_id());
476
477 let outcome = {
480 use std::collections::hash_map::Entry;
481 let mut conversations = self.conversations.write();
482 if let Entry::Vacant(e) = conversations.entry(dg_id) {
483 let mut new_dg = Conversation::create(
484 dg_id,
485 Some("device-group".into()),
486 self.local_device.device_id.clone(),
487 self.identity.user_id(),
488 self.crypto.clone(),
489 self.signing.clone(),
490 self.storage.clone(),
491 now_ms,
492 )?;
493 new_dg.meta.is_device_group = true;
494 e.insert(new_dg);
495 }
496 let dg = conversations
497 .get_mut(&dg_id)
498 .expect("DeviceGroup just inserted or already present");
499 dg.add_members(vec![(new_device_id.clone(), new_device_kp)], now_ms)?
503 };
504
505 let catchup_snapshot = if last_app_events.is_empty() && self.conversations.read().is_empty()
512 {
513 Vec::new()
515 } else {
516 let conversation_metas: Vec<CatchupConversationEntry> = self
517 .conversations
518 .read()
519 .values()
520 .map(|c| -> Result<CatchupConversationEntry> {
521 let group_bytes = c.export_state_snapshot(now_ms)?.to_vec();
525 Ok(CatchupConversationEntry {
526 conversation_id: c.id(),
527 meta: c.meta().clone(),
528 group_state_bytes: group_bytes,
529 })
530 })
531 .collect::<Result<_>>()?;
532 let last_app_events_per_conv: Vec<CatchupAppEventEntry> = last_app_events
533 .into_iter()
534 .map(|(conversation_id, app_event_bytes)| CatchupAppEventEntry {
535 conversation_id,
536 app_event_bytes,
537 })
538 .collect();
539 CatchupSnapshot {
540 v: CATCHUP_SNAPSHOT_VERSION,
541 conversation_metas,
542 last_app_events_per_conv,
543 }
544 .encode()?
545 };
546
547 Ok(LinkingTicket {
548 v: 1,
549 user_id: self.identity.user_id().clone(),
550 user_pubkey: self.identity.public_key().to_bytes().to_vec(),
551 new_device_id,
552 device_binding_sig,
553 device_group_welcome: outcome.welcome.payload,
554 catchup_snapshot,
555 })
556 }
557
558 pub async fn consume_linking_ticket(
561 self: &Arc<Self>,
562 ticket: &LinkingTicket,
563 now_ms: u64,
564 ) -> Result<()> {
565 let pk_bytes: [u8; 32] = ticket
567 .user_pubkey
568 .as_slice()
569 .try_into()
570 .map_err(|_| Error::Identity("user_pubkey must be 32 bytes".into()))?;
571 let user_pk = ed25519_dalek::VerifyingKey::from_bytes(&pk_bytes)
572 .map_err(|e| Error::Identity(format!("bad user pubkey: {e}")))?;
573 Identity::verify_device_binding(
574 &user_pk,
575 &ticket.user_id,
576 &ticket.new_device_id.0,
577 &ticket.device_binding_sig,
578 )?;
579 if ticket.new_device_id != self.local_device.device_id {
580 return Err(Error::Invalid(
581 "ticket addressed to a different device".into(),
582 ));
583 }
584
585 let dummy_env = MessageEnvelope::new(
586 ConversationId(device_group_id_for(&ticket.user_id).0),
587 0,
588 MessageKind::Welcome,
589 self.local_device.device_id.clone(),
590 0,
591 crate::clock::Hlc::ZERO,
592 ticket.device_group_welcome.clone(),
593 );
594 self.join_conversation(&dummy_env, now_ms).await?;
595 Ok(())
596 }
597
598 pub fn export_conversation_state_snapshot(
603 &self,
604 conv_id: ConversationId,
605 now_ms: u64,
606 ) -> Result<zeroize::Zeroizing<Vec<u8>>> {
607 let guard = self.conversations.read();
608 let convo = guard
609 .get(&conv_id)
610 .ok_or_else(|| Error::UnknownConversation(conv_id.as_hex()))?;
611 convo.export_state_snapshot(now_ms)
612 }
613
614 pub async fn import_state_snapshot(
633 self: &Arc<Self>,
634 snapshot_bytes: &[u8],
635 now_ms: u64,
636 ) -> Result<ConversationId> {
637 use crate::device::GroupStateSnapshot;
638 let snap = GroupStateSnapshot::decode(snapshot_bytes)
639 .map_err(|e| Error::Invalid(format!("snapshot decode: {e}")))?;
640
641 if snap.openmls_storage_version != openmls_traits::storage::CURRENT_VERSION {
642 return Err(Error::Invalid(format!(
643 "snapshot openmls_storage_version={} not supported (this SDK supports v={})",
644 snap.openmls_storage_version,
645 openmls_traits::storage::CURRENT_VERSION
646 )));
647 }
648
649 let conv_id = snap.group_id;
650
651 if self.conversations.read().contains_key(&conv_id) {
655 return Err(Error::Invalid(format!(
656 "conversation {} already open; close before importing snapshot",
657 conv_id.as_hex()
658 )));
659 }
660
661 let entries: Vec<(Vec<u8>, Vec<u8>)> =
663 snap.entries.into_iter().map(|e| (e.key, e.value)).collect();
664 self.crypto
665 .import_entries(entries)
666 .map_err(|e| Error::Storage(format!("import entries: {e}")))?;
667
668 let meta = ConversationMeta {
672 id: conv_id,
673 name: None,
674 epoch: 0, member_count: 0,
676 is_device_group: false, created_at_ms: now_ms,
678 };
679 let convo = Conversation::load(
680 conv_id,
681 meta,
682 SyncCursor::default(),
683 std::collections::BTreeMap::new(),
684 self.local_device.device_id.clone(),
685 self.crypto.clone(),
686 self.signing.clone(),
687 self.storage.clone(),
688 now_ms,
689 )?
690 .ok_or_else(|| {
691 Error::Invalid(
692 "snapshot imported but OpenMLS could not load the group — snapshot may be incomplete or storage version mismatched"
693 .into(),
694 )
695 })?;
696
697 let live_epoch = convo.epoch();
700 let live_members = convo.group.members().count() as u32;
701 let mut convo = convo;
702 convo.meta.epoch = live_epoch;
703 convo.meta.member_count = live_members;
704 convo.snapshot_to_storage().await?;
705
706 self.conversations.write().insert(conv_id, convo);
707 Ok(conv_id)
708 }
709
710 pub fn export_conversation_secret(
716 &self,
717 conv_id: ConversationId,
718 label: &str,
719 context: &[u8],
720 length: usize,
721 ) -> Result<Zeroizing<Vec<u8>>> {
722 let guard = self.conversations.read();
723 let convo = guard
724 .get(&conv_id)
725 .ok_or_else(|| Error::UnknownConversation(conv_id.as_hex()))?;
726 convo.export_secret(label, context, length)
727 }
728
729 #[allow(clippy::await_holding_lock)] pub async fn revoke_device(
750 &self,
751 device_id: DeviceId,
752 now_ms: u64,
753 ) -> Result<Vec<MessageEnvelope>> {
754 let targets: Vec<(ConversationId, u32)> = self
758 .conversations
759 .read()
760 .iter()
761 .filter_map(|(id, c)| c.leaf_index_of(&device_id).map(|leaf| (*id, leaf)))
762 .collect();
763
764 let mut envelopes = Vec::with_capacity(targets.len());
768 for (conv_id, leaf_index) in targets {
769 let envelope = {
770 let mut guard = self.conversations.write();
771 let convo = guard
772 .get_mut(&conv_id)
773 .ok_or_else(|| Error::UnknownConversation(conv_id.as_hex()))?;
774 convo.remove_members(vec![leaf_index], now_ms)?
775 };
776 self.transport.send(envelope.clone()).await?;
777 if let Some(c) = self.conversations.read().get(&conv_id) {
778 c.snapshot_to_storage().await?;
779 }
780 envelopes.push(envelope);
781 }
782 Ok(envelopes)
783 }
784}
785
786fn device_group_id_for(user_id: &UserId) -> ConversationId {
787 let mut bytes = [0u8; 16];
790 bytes[0] = 0xFF;
791 bytes[1] = 0xDC; let h = codec::sha256(&user_id.0);
793 bytes[2..].copy_from_slice(&h[..14]);
794 ConversationId(bytes)
795}
796
797fn encode_local_device(d: &LocalDevice) -> Result<Vec<u8>> {
798 use serde::Serialize;
799 #[derive(Serialize)]
800 struct Persisted<'a> {
801 device_id: &'a DeviceId,
802 label: &'a str,
803 created_at_ms: u64,
804 #[serde(with = "serde_bytes")]
805 signing_seed: &'a [u8],
806 }
807 codec::encode(&Persisted {
808 device_id: &d.device_id,
809 label: &d.label,
810 created_at_ms: d.created_at_ms,
811 signing_seed: d.signing.as_bytes(),
812 })
813}
814
815fn decode_local_device(bytes: &[u8], user_id: UserId) -> Result<LocalDevice> {
816 use serde::Deserialize;
817 #[derive(Deserialize)]
818 struct Persisted {
819 device_id: DeviceId,
820 label: String,
821 created_at_ms: u64,
822 #[serde(with = "serde_bytes")]
823 signing_seed: Vec<u8>,
824 }
825 let p: Persisted = codec::decode(bytes)?;
826 let seed: [u8; 32] = p
827 .signing_seed
828 .as_slice()
829 .try_into()
830 .map_err(|_| Error::Invalid("device signing seed must be 32 bytes".into()))?;
831 let signing = ed25519_dalek::SigningKey::from_bytes(&seed);
832 Ok(LocalDevice {
833 device_id: p.device_id,
834 user_id,
835 label: p.label,
836 signing,
837 created_at_ms: p.created_at_ms,
838 })
839}