use openmls::framing::MlsMessageOut;
use openmls::prelude::{
tls_codec::Serialize as TlsSerialize, BasicCredential, Ciphersuite, CredentialWithKey,
KeyPackageBuilder,
};
use openmls_basic_credential::SignatureKeyPair;
use openmls_rust_crypto::OpenMlsRustCrypto;
use parking_lot::RwLock;
use std::collections::HashMap;
use std::sync::Arc;
use crate::{
codec,
conversation::{Conversation, ConversationId, ConversationMeta},
device::{DeviceId, DeviceInfo, LinkingTicket, LocalDevice},
error::{Error, Result},
identity::{Identity, UserId},
message::{IncomingMessage, MessageEnvelope, MessageKind},
storage::Storage,
sync::SyncCursor,
transport::Transport,
};
const DEFAULT_CIPHERSUITE: Ciphersuite = Ciphersuite::MLS_128_DHKEMX25519_AES128GCM_SHA256_Ed25519;
#[derive(Debug)]
pub struct ClientConfig {
pub identity: Identity,
pub device_label: String,
pub storage: Arc<dyn Storage>,
pub transport: Arc<dyn Transport>,
pub now_ms: u64,
}
pub struct MessagingClient {
pub(crate) identity: Identity,
pub(crate) local_device: LocalDevice,
pub(crate) crypto: Arc<OpenMlsRustCrypto>,
pub(crate) signing: Arc<SignatureKeyPair>,
pub(crate) storage: Arc<dyn Storage>,
pub(crate) transport: Arc<dyn Transport>,
conversations: RwLock<HashMap<ConversationId, Conversation>>,
}
impl std::fmt::Debug for MessagingClient {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MessagingClient")
.field("user_id", &self.identity.user_id().as_hex())
.field("device_id", &self.local_device.device_id.as_hex())
.field("conversation_count", &self.conversations.read().len())
.finish()
}
}
impl MessagingClient {
pub async fn init(cfg: ClientConfig) -> Result<Arc<Self>> {
let crypto = Arc::new(OpenMlsRustCrypto::default());
let local_device = match cfg.storage.get("device", "local").await? {
Some(bytes) => decode_local_device(&bytes, cfg.identity.user_id().clone())?,
None => {
let dev = LocalDevice::generate(
cfg.identity.user_id().clone(),
cfg.device_label,
cfg.now_ms,
);
let bytes = encode_local_device(&dev)?;
cfg.storage.put("device", "local", bytes).await?;
dev
}
};
let signing = Arc::new(
SignatureKeyPair::new(DEFAULT_CIPHERSUITE.signature_algorithm()).map_err(Error::mls)?,
);
let client = Arc::new(Self {
identity: cfg.identity,
local_device,
crypto,
signing,
storage: cfg.storage,
transport: cfg.transport,
conversations: RwLock::new(HashMap::new()),
});
client.rehydrate_conversations(cfg.now_ms).await?;
Ok(client)
}
pub fn user_id(&self) -> UserId {
self.identity.user_id().clone()
}
pub fn device_id(&self) -> DeviceId {
self.local_device.device_id.clone()
}
pub fn device_info(&self, now_ms: u64) -> DeviceInfo {
self.local_device.info(now_ms)
}
pub fn fresh_key_package(&self) -> Result<Vec<u8>> {
let credential_with_key = CredentialWithKey {
credential: BasicCredential::new(self.identity.user_id().0.clone()).into(),
signature_key: self.signing.public().to_vec().into(),
};
let bundle = KeyPackageBuilder::new()
.build(
DEFAULT_CIPHERSUITE,
self.crypto.as_ref(),
self.signing.as_ref(),
credential_with_key,
)
.map_err(Error::mls)?;
let msg: MlsMessageOut = bundle.key_package().clone().into();
msg.tls_serialize_detached().map_err(Error::mls)
}
pub async fn create_conversation(
self: &Arc<Self>,
name: Option<String>,
now_ms: u64,
) -> Result<ConversationId> {
let id = ConversationId::new();
let convo = Conversation::create(
id,
name,
self.local_device.device_id.clone(),
self.identity.user_id(),
self.crypto.clone(),
self.signing.clone(),
self.storage.clone(),
now_ms,
)?;
convo.snapshot_to_storage().await?;
self.conversations.write().insert(id, convo);
Ok(id)
}
pub async fn join_conversation(
self: &Arc<Self>,
welcome_envelope: &MessageEnvelope,
now_ms: u64,
) -> Result<ConversationId> {
if welcome_envelope.kind != MessageKind::Welcome {
return Err(Error::Invalid("expected Welcome envelope".into()));
}
let convo = Conversation::join(
&welcome_envelope.payload,
self.local_device.device_id.clone(),
self.crypto.clone(),
self.signing.clone(),
self.storage.clone(),
now_ms,
)?;
let id = convo.id();
convo.snapshot_to_storage().await?;
self.conversations.write().insert(id, convo);
Ok(id)
}
pub fn list_conversations(&self) -> Vec<ConversationMeta> {
self.conversations
.read()
.values()
.map(|c| c.meta.clone())
.collect()
}
pub async fn send(
&self,
conv_id: ConversationId,
plaintext: Vec<u8>,
now_ms: u64,
) -> Result<MessageEnvelope> {
let envelope = {
let mut guard = self.conversations.write();
let convo = guard
.get_mut(&conv_id)
.ok_or_else(|| Error::UnknownConversation(conv_id.as_hex()))?;
convo.send_application(&plaintext, now_ms)?
};
self.transport.send(envelope.clone()).await?;
Ok(envelope)
}
#[allow(clippy::await_holding_lock)]
pub async fn add_members(
&self,
conv_id: ConversationId,
key_packages: Vec<Vec<u8>>,
now_ms: u64,
) -> Result<()> {
let outcome = {
let mut guard = self.conversations.write();
let convo = guard
.get_mut(&conv_id)
.ok_or_else(|| Error::UnknownConversation(conv_id.as_hex()))?;
convo.add_members(key_packages, now_ms)?
};
self.transport.send(outcome.commit).await?;
self.transport.send(outcome.welcome).await?;
if let Some(c) = self.conversations.read().get(&conv_id) {
c.snapshot_to_storage().await?;
}
Ok(())
}
#[allow(clippy::await_holding_lock)] pub async fn remove_members(
&self,
conv_id: ConversationId,
leaf_indexes: Vec<u32>,
now_ms: u64,
) -> Result<()> {
let envelope = {
let mut guard = self.conversations.write();
let convo = guard
.get_mut(&conv_id)
.ok_or_else(|| Error::UnknownConversation(conv_id.as_hex()))?;
convo.remove_members(leaf_indexes, now_ms)?
};
self.transport.send(envelope).await?;
if let Some(c) = self.conversations.read().get(&conv_id) {
c.snapshot_to_storage().await?;
}
Ok(())
}
#[allow(clippy::await_holding_lock)] pub async fn process_envelope(
&self,
env: &MessageEnvelope,
now_ms: u64,
) -> Result<Option<IncomingMessage>> {
let mut guard = self.conversations.write();
let convo = match guard.get_mut(&env.conversation_id) {
Some(c) => c,
None => return Err(Error::UnknownConversation(env.conversation_id.as_hex())),
};
let out = convo.process(env, now_ms)?;
convo.snapshot_to_storage().await?;
Ok(out)
}
pub async fn sync_conversations(&self, now_ms: u64) -> Result<Vec<IncomingMessage>> {
let pending: Vec<(ConversationId, SyncCursor)> = self
.conversations
.read()
.iter()
.map(|(id, c)| (*id, c.cursor.clone()))
.collect();
let mut delivered = Vec::new();
for (conv_id, cursor) in pending {
loop {
let batch = self
.transport
.fetch_since(conv_id, cursor.clone(), 256)
.await?;
if batch.is_empty() {
break;
}
for env in &batch {
if let Some(msg) = self.process_envelope(env, now_ms).await? {
delivered.push(msg);
}
}
if batch.len() < 256 {
break;
} }
}
Ok(delivered)
}
async fn rehydrate_conversations(self: &Arc<Self>, _now_ms: u64) -> Result<()> {
let metas = self.storage.list_keys("groups", "").await?;
for path in metas {
let Some((id_hex, suffix)) = path.split_once('/') else {
continue;
};
if suffix != "meta" {
continue;
}
let Some(meta_bytes) = self.storage.get("groups", &path).await? else {
continue;
};
let meta: ConversationMeta = match codec::decode(&meta_bytes) {
Ok(m) => m,
Err(_) => continue,
};
let cursor_bytes = self
.storage
.get("cursors", id_hex)
.await?
.unwrap_or_default();
let cursor = if cursor_bytes.is_empty() {
SyncCursor::default()
} else {
SyncCursor::decode(&cursor_bytes).unwrap_or_default()
};
tracing::debug!(target: "ping_core::client", convo = %id_hex, epoch = meta.epoch, "rehydrated conversation meta");
let _ = (meta, cursor); }
Ok(())
}
pub async fn build_linking_ticket(
&self,
new_device_id: DeviceId,
new_device_kp: Vec<u8>,
now_ms: u64,
) -> Result<LinkingTicket> {
let device_binding_sig = self.identity.sign_device_binding(&new_device_id.0);
let dg_id = device_group_id_for(self.identity.user_id());
let outcome = {
use std::collections::hash_map::Entry;
let mut conversations = self.conversations.write();
if let Entry::Vacant(e) = conversations.entry(dg_id) {
let mut new_dg = Conversation::create(
dg_id,
Some("device-group".into()),
self.local_device.device_id.clone(),
self.identity.user_id(),
self.crypto.clone(),
self.signing.clone(),
self.storage.clone(),
now_ms,
)?;
new_dg.meta.is_device_group = true;
e.insert(new_dg);
}
let dg = conversations
.get_mut(&dg_id)
.expect("DeviceGroup just inserted or already present");
dg.add_members(vec![new_device_kp], now_ms)?
};
Ok(LinkingTicket {
v: 1,
user_id: self.identity.user_id().clone(),
user_pubkey: self.identity.public_key().to_bytes().to_vec(),
new_device_id,
device_binding_sig,
device_group_welcome: outcome.welcome.payload,
catchup_snapshot: Vec::new(), })
}
pub async fn consume_linking_ticket(
self: &Arc<Self>,
ticket: &LinkingTicket,
now_ms: u64,
) -> Result<()> {
let pk_bytes: [u8; 32] = ticket
.user_pubkey
.as_slice()
.try_into()
.map_err(|_| Error::Identity("user_pubkey must be 32 bytes".into()))?;
let user_pk = ed25519_dalek::VerifyingKey::from_bytes(&pk_bytes)
.map_err(|e| Error::Identity(format!("bad user pubkey: {e}")))?;
Identity::verify_device_binding(
&user_pk,
&ticket.user_id,
&ticket.new_device_id.0,
&ticket.device_binding_sig,
)?;
if ticket.new_device_id != self.local_device.device_id {
return Err(Error::Invalid(
"ticket addressed to a different device".into(),
));
}
let dummy_env = MessageEnvelope::new(
ConversationId(device_group_id_for(&ticket.user_id).0),
0,
MessageKind::Welcome,
self.local_device.device_id.clone(),
0,
crate::clock::Hlc::ZERO,
ticket.device_group_welcome.clone(),
);
self.join_conversation(&dummy_env, now_ms).await?;
Ok(())
}
pub async fn revoke_device(&self, _device_id: DeviceId, _now_ms: u64) -> Result<()> {
Err(Error::Invalid(
"revoke_device: not implemented in v0.1 — see roadmap".into(),
))
}
}
fn device_group_id_for(user_id: &UserId) -> ConversationId {
let mut bytes = [0u8; 16];
bytes[0] = 0xFF;
bytes[1] = 0xDC; let h = codec::sha256(&user_id.0);
bytes[2..].copy_from_slice(&h[..14]);
ConversationId(bytes)
}
fn encode_local_device(d: &LocalDevice) -> Result<Vec<u8>> {
use serde::Serialize;
#[derive(Serialize)]
struct Persisted<'a> {
device_id: &'a DeviceId,
label: &'a str,
created_at_ms: u64,
#[serde(with = "serde_bytes")]
signing_seed: &'a [u8],
}
codec::encode(&Persisted {
device_id: &d.device_id,
label: &d.label,
created_at_ms: d.created_at_ms,
signing_seed: d.signing.as_bytes(),
})
}
fn decode_local_device(bytes: &[u8], user_id: UserId) -> Result<LocalDevice> {
use serde::Deserialize;
#[derive(Deserialize)]
struct Persisted {
device_id: DeviceId,
label: String,
created_at_ms: u64,
#[serde(with = "serde_bytes")]
signing_seed: Vec<u8>,
}
let p: Persisted = codec::decode(bytes)?;
let seed: [u8; 32] = p
.signing_seed
.as_slice()
.try_into()
.map_err(|_| Error::Invalid("device signing seed must be 32 bytes".into()))?;
let signing = ed25519_dalek::SigningKey::from_bytes(&seed);
Ok(LocalDevice {
device_id: p.device_id,
user_id,
label: p.label,
signing,
created_at_ms: p.created_at_ms,
})
}