use std::collections::HashMap;
use std::sync::Mutex;
use std::time::{SystemTime, UNIX_EPOCH};
use hkdf::Hkdf;
use sha2::Sha256;
use x25519_dalek::{PublicKey as XPub, StaticSecret};
use crate::e2e::DEFAULT_TS_TOLERANCE_SECS;
use crate::e2e::chunker::split_plaintext;
use crate::e2e::crypto::{
aead::{self, SessionKey},
ecdh,
fingerprint::{Fingerprint, fingerprint},
identity::Identity,
sig,
};
use crate::e2e::error::{E2eError, Result};
use crate::e2e::handshake::{
KeyRekey, KeyReq, KeyRsp, RateLimiter, encode_keyrekey, encode_keyreq, encode_keyrsp,
signed_keyrekey_payload, signed_keyreq_payload, signed_keyrsp_payload,
};
use crate::e2e::keyring::{ChannelMode, IncomingSession, Keyring, PeerRecord, TrustStatus};
use crate::e2e::wire::{WireChunk, build_aad, fresh_msgid};
struct PendingHandshake {
#[allow(dead_code, reason = "future diagnostics: pending channel listing")]
channel: String,
peer_handle: Option<String>,
eph_x25519_secret: [u8; 32],
}
#[derive(Debug, Clone)]
struct PendingInboundKeyReq {
req: KeyReq,
}
#[derive(Debug, Clone)]
pub struct PendingRekeySend {
pub target_handle: String,
pub notice_text: String,
}
#[derive(Debug, Clone)]
pub struct PendingOutboundKeyReq {
pub peer_handle: String,
pub channel: String,
pub req: KeyReq,
}
#[derive(Debug, Clone)]
pub struct PendingAcceptRequest {
pub nick: Option<String>,
pub handle: String,
pub channel: String,
}
pub struct E2eManager {
identity: Identity,
keyring: Keyring,
rate_limiter: Mutex<RateLimiter>,
ts_tolerance_secs: i64,
pending: Mutex<HashMap<(String, [u8; 16]), PendingHandshake>>,
pending_trust_change: Mutex<Vec<PendingTrustNotice>>,
pending_rekey_sends: Mutex<Vec<PendingRekeySend>>,
pending_outbound_keyreqs: Mutex<Vec<PendingOutboundKeyReq>>,
pending_inbound: Mutex<HashMap<(String, String), PendingInboundKeyReq>>,
pending_accept_requests: Mutex<Vec<PendingAcceptRequest>>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TrustChange {
New,
Known,
HandleChanged {
old_handle: String,
new_handle: String,
fingerprint: Fingerprint,
},
FingerprintChanged {
handle: String,
old_fp: Fingerprint,
new_fp: Fingerprint,
},
Revoked {
handle: String,
fingerprint: Fingerprint,
},
}
#[derive(Debug, Clone)]
pub struct PendingTrustNotice {
pub handle: String,
pub channel: String,
pub change: TrustChange,
pub new_pubkey: Option<[u8; 32]>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ReverifyOutcome {
Applied {
old_fp: Fingerprint,
new_fp: Fingerprint,
},
Cleared { deleted: usize },
NotFound,
}
#[derive(Debug, Clone)]
pub enum DecryptOutcome {
Plaintext(String),
MissingKey { handle: String, channel: String },
Rejected(String),
}
impl E2eManager {
fn clear_pending_state_for_handle(&self, handle: &str) -> usize {
let mut deleted = 0usize;
let mut pending = self.pending.lock().expect("e2e pending mutex poisoned");
let pending_before = pending.len();
pending.retain(|_, ph| ph.peer_handle.as_deref() != Some(handle));
deleted += pending_before.saturating_sub(pending.len());
drop(pending);
let mut pending_inbound = self
.pending_inbound
.lock()
.expect("e2e pending inbound mutex poisoned");
let pending_inbound_before = pending_inbound.len();
pending_inbound.retain(|(pending_handle, _), _| pending_handle != handle);
deleted += pending_inbound_before.saturating_sub(pending_inbound.len());
drop(pending_inbound);
let mut pending_accept = self
.pending_accept_requests
.lock()
.expect("e2e pending accept requests mutex poisoned");
let accept_before = pending_accept.len();
pending_accept.retain(|req| req.handle != handle);
deleted += accept_before.saturating_sub(pending_accept.len());
drop(pending_accept);
let mut notices = self
.pending_trust_change
.lock()
.expect("e2e pending trust change mutex poisoned");
let notices_before = notices.len();
notices.retain(|notice| notice.handle != handle);
deleted += notices_before.saturating_sub(notices.len());
drop(notices);
let mut outbound = self
.pending_outbound_keyreqs
.lock()
.expect("e2e pending outbound keyreqs mutex poisoned");
let outbound_before = outbound.len();
outbound.retain(|req| req.peer_handle != handle);
deleted += outbound_before.saturating_sub(outbound.len());
deleted
}
pub fn load_or_init(keyring: Keyring) -> Result<Self> {
Self::load_or_init_with_tolerance(keyring, DEFAULT_TS_TOLERANCE_SECS)
}
pub fn load_or_init_with_config(
keyring: Keyring,
cfg: &crate::config::E2eConfig,
) -> Result<Self> {
Self::load_or_init_with_tolerance(keyring, cfg.ts_tolerance_secs)
}
fn load_or_init_with_tolerance(keyring: Keyring, ts_tolerance_secs: i64) -> Result<Self> {
let identity = if let Some((stored_pk, sk, stored_fp, _ts)) = keyring.load_identity()? {
let id = Identity::from_secret_bytes(&sk);
let computed_pk = id.public_bytes();
if computed_pk != stored_pk {
return Err(E2eError::Crypto(
"stored identity public key does not match secret — keyring may be corrupted"
.into(),
));
}
let computed_fp = fingerprint(&computed_pk);
if computed_fp != stored_fp {
return Err(E2eError::Crypto(
"stored identity fingerprint does not match pubkey — keyring may be corrupted"
.into(),
));
}
id
} else {
let id = Identity::generate()?;
let pk = id.public_bytes();
let sk = id.secret_bytes();
let fp = fingerprint(&pk);
let now = now_unix();
keyring.save_identity(&pk, &sk, &fp, now)?;
id
};
Ok(Self {
identity,
keyring,
rate_limiter: Mutex::new(RateLimiter::new()),
ts_tolerance_secs,
pending: Mutex::new(HashMap::new()),
pending_trust_change: Mutex::new(Vec::new()),
pending_rekey_sends: Mutex::new(Vec::new()),
pending_outbound_keyreqs: Mutex::new(Vec::new()),
pending_inbound: Mutex::new(HashMap::new()),
pending_accept_requests: Mutex::new(Vec::new()),
})
}
fn classify_peer_change(&self, new_fp: &Fingerprint, new_handle: &str) -> Result<TrustChange> {
if let Some(existing) = self.keyring.get_peer_by_fingerprint(new_fp)? {
if existing.global_status == TrustStatus::Revoked {
return Ok(TrustChange::Revoked {
handle: new_handle.to_string(),
fingerprint: *new_fp,
});
}
if existing.last_handle.as_deref() != Some(new_handle) {
return Ok(TrustChange::HandleChanged {
old_handle: existing.last_handle.unwrap_or_default(),
new_handle: new_handle.to_string(),
fingerprint: *new_fp,
});
}
return Ok(TrustChange::Known);
}
if let Some(existing) = self.keyring.get_peer_by_handle(new_handle)? {
return Ok(TrustChange::FingerprintChanged {
handle: new_handle.to_string(),
old_fp: existing.fingerprint,
new_fp: *new_fp,
});
}
Ok(TrustChange::New)
}
fn record_trust_change(&self, notice: PendingTrustNotice) {
self.pending_trust_change
.lock()
.expect("e2e pending trust change mutex poisoned")
.push(notice);
}
pub fn take_pending_trust_changes(&self) -> Vec<PendingTrustNotice> {
let mut guard = self
.pending_trust_change
.lock()
.expect("e2e pending trust change mutex poisoned");
std::mem::take(&mut *guard)
}
pub fn reverify_peer(&self, nick_or_handle: &str) -> Result<ReverifyOutcome> {
let mut notices_guard = self
.pending_trust_change
.lock()
.expect("e2e pending trust change mutex poisoned");
let mut applied: Option<(Fingerprint, Fingerprint, [u8; 32])> = None;
let mut kept: Vec<PendingTrustNotice> = Vec::with_capacity(notices_guard.len());
for notice in std::mem::take(&mut *notices_guard) {
if notice.handle != nick_or_handle {
kept.push(notice);
continue;
}
if applied.is_none()
&& let TrustChange::FingerprintChanged { old_fp, new_fp, .. } = ¬ice.change
&& let Some(pk) = notice.new_pubkey
{
applied = Some((*old_fp, *new_fp, pk));
}
}
*notices_guard = kept;
drop(notices_guard);
if let Some((old_fp, new_fp, new_pubkey)) = applied {
self.keyring.delete_peer_by_fingerprint(&old_fp)?;
self.keyring
.delete_incoming_sessions_for_handle(nick_or_handle)?;
self.keyring
.delete_outgoing_recipients_for_handle(nick_or_handle)?;
self.clear_pending_state_for_handle(nick_or_handle);
let now = now_unix();
self.keyring.upsert_peer(&PeerRecord {
fingerprint: new_fp,
pubkey: new_pubkey,
last_handle: Some(nick_or_handle.to_string()),
last_nick: None,
first_seen: now,
last_seen: now,
global_status: TrustStatus::Trusted,
})?;
return Ok(ReverifyOutcome::Applied { old_fp, new_fp });
}
let mut deleted: usize = 0;
if let Some(peer) = self.keyring.get_peer_by_handle(nick_or_handle)? {
self.keyring.delete_peer_by_fingerprint(&peer.fingerprint)?;
deleted += 1;
}
deleted += self
.keyring
.delete_incoming_sessions_for_handle(nick_or_handle)?;
deleted += self
.keyring
.delete_outgoing_recipients_for_handle(nick_or_handle)?;
deleted += self.clear_pending_state_for_handle(nick_or_handle);
if deleted == 0 {
Ok(ReverifyOutcome::NotFound)
} else {
Ok(ReverifyOutcome::Cleared { deleted })
}
}
#[must_use]
pub fn fingerprint(&self) -> Fingerprint {
fingerprint(&self.identity.public_bytes())
}
#[must_use]
pub fn keyring(&self) -> &Keyring {
&self.keyring
}
#[must_use]
pub fn identity_pub(&self) -> [u8; 32] {
self.identity.public_bytes()
}
pub fn encrypt_outgoing(&self, channel: &str, plaintext: &str) -> Result<Vec<String>> {
let sk = self.get_or_generate_outgoing_key(channel)?;
let chunks = split_plaintext(plaintext)?;
let total_usize = chunks.len();
let total = u8::try_from(total_usize).map_err(|_| E2eError::ChunkLimit(u8::MAX))?;
if total == 0 {
return Err(E2eError::Wire("chunker produced zero chunks".into()));
}
let msgid = fresh_msgid();
let ts = now_unix();
let mut out = Vec::with_capacity(total_usize);
for (idx, plain) in chunks.iter().enumerate() {
let part = u8::try_from(idx + 1).map_err(|_| E2eError::ChunkLimit(u8::MAX))?;
let aad = build_aad(channel, msgid, ts, part, total);
let (nonce, ct) = aead::encrypt(&sk, &aad, plain)?;
let wire = WireChunk {
msgid,
ts,
part,
total,
nonce,
ciphertext: ct,
};
out.push(wire.encode()?);
}
Ok(out)
}
fn get_or_generate_outgoing_key(&self, channel: &str) -> Result<SessionKey> {
if let Some(sess) = self.keyring.get_outgoing_session(channel)?
&& !sess.pending_rotation
{
return Ok(sess.sk);
}
let had_pending_rotation = self
.keyring
.get_outgoing_session(channel)?
.is_some_and(|s| s.pending_rotation);
let fresh = aead::generate_session_key()?;
self.keyring
.set_outgoing_session(channel, &fresh, now_unix())?;
if had_pending_rotation {
let recipients = self.keyring.list_outgoing_recipients(channel)?;
if !recipients.is_empty() {
let mut queue = self
.pending_rekey_sends
.lock()
.expect("e2e pending rekey mutex poisoned");
for (handle, fp) in recipients {
let Some(peer) = self.keyring.get_peer_by_fingerprint(&fp)? else {
tracing::warn!(
"rekey: no peer row for fp={} (handle={}); skipping",
hex::encode(fp),
handle,
);
continue;
};
match self.build_rekey_for_peer(channel, &peer, &fresh) {
Ok(rk) => {
let body = format!("\x01{}\x01", encode_keyrekey(&rk));
queue.push(PendingRekeySend {
target_handle: handle,
notice_text: body,
});
}
Err(e) => {
tracing::warn!("rekey: build_rekey_for_peer failed for {handle}: {e}");
}
}
}
}
}
Ok(fresh)
}
fn build_rekey_for_peer(
&self,
channel: &str,
peer: &PeerRecord,
new_sk: &SessionKey,
) -> Result<KeyRekey> {
let mut eph_sk_bytes = [0u8; 32];
rand::fill(&mut eph_sk_bytes);
let eph_sk = StaticSecret::from(eph_sk_bytes);
let eph_pub = XPub::from(&eph_sk).to_bytes();
let peer_x25519_pub = ecdh::ed25519_pub_to_x25519(&peer.pubkey)?;
let shared = eph_sk.diffie_hellman(&XPub::from(peer_x25519_pub));
let info = rekey_info(channel);
let hk = Hkdf::<Sha256>::new(Some(b"RPE2E01-WRAP"), shared.as_bytes());
let mut wrap_key = [0u8; 32];
hk.expand(info.as_bytes(), &mut wrap_key)
.expect("hkdf expand 32 bytes never fails");
let (wrap_nonce, wrap_ct) = aead::encrypt(&wrap_key, info.as_bytes(), new_sk)?;
let mut nonce = [0u8; 16];
rand::fill(&mut nonce);
let pubkey = self.identity.public_bytes();
let sig_payload =
signed_keyrekey_payload(channel, &pubkey, &eph_pub, &wrap_nonce, &wrap_ct, &nonce);
let sig_bytes = sig::sign(self.identity.signing_key(), &sig_payload);
Ok(KeyRekey {
channel: channel.to_string(),
pubkey,
eph_pub,
wrap_nonce,
wrap_ct,
nonce,
sig: sig_bytes,
})
}
pub fn take_pending_rekey_sends(&self) -> Vec<PendingRekeySend> {
let mut guard = self
.pending_rekey_sends
.lock()
.expect("e2e pending rekey mutex poisoned");
std::mem::take(&mut *guard)
}
pub fn take_pending_outbound_keyreqs(&self) -> Vec<PendingOutboundKeyReq> {
let mut guard = self
.pending_outbound_keyreqs
.lock()
.expect("e2e pending outbound keyreqs mutex poisoned");
std::mem::take(&mut *guard)
}
pub fn take_pending_accept_requests(&self) -> Vec<PendingAcceptRequest> {
let mut guard = self
.pending_accept_requests
.lock()
.expect("e2e pending accept requests mutex poisoned");
std::mem::take(&mut *guard)
}
pub fn forget_peer_on_channel(&self, handle: &str, channel: &str) -> Result<usize> {
let mut deleted = usize::from(
self.keyring
.get_incoming_session(handle, channel)?
.is_some(),
);
if deleted != 0 {
self.keyring.delete_incoming_session(handle, channel)?;
}
let removed_pending_inbound = self
.pending_inbound
.lock()
.expect("e2e pending inbound mutex poisoned")
.remove(&(handle.to_string(), channel.to_string()))
.is_some();
deleted += usize::from(removed_pending_inbound);
let mut requests = self
.pending_accept_requests
.lock()
.expect("e2e pending accept requests mutex poisoned");
let before = requests.len();
requests.retain(|r| !(r.handle == handle && r.channel == channel));
deleted += before.saturating_sub(requests.len());
drop(requests);
let mut pending = self.pending.lock().expect("e2e pending mutex poisoned");
let pending_before = pending.len();
pending.retain(|(pending_channel, _), ph| {
!(pending_channel == channel && ph.peer_handle.as_deref() == Some(handle))
});
deleted += pending_before.saturating_sub(pending.len());
drop(pending);
let mut outbound = self
.pending_outbound_keyreqs
.lock()
.expect("e2e pending outbound keyreqs mutex poisoned");
let outbound_before = outbound.len();
outbound.retain(|req| !(req.peer_handle == handle && req.channel == channel));
deleted += outbound_before.saturating_sub(outbound.len());
Ok(deleted)
}
pub fn forget_peer_everywhere(&self, handle: &str) -> Result<usize> {
let mut deleted = 0usize;
if let Some(peer) = self.keyring.get_peer_by_handle(handle)? {
self.keyring.delete_peer_by_fingerprint(&peer.fingerprint)?;
deleted += 1;
}
deleted += self.keyring.delete_incoming_sessions_for_handle(handle)?;
deleted += self.keyring.delete_outgoing_recipients_for_handle(handle)?;
deleted += self.clear_pending_state_for_handle(handle);
Ok(deleted)
}
pub fn handle_rekey(&self, sender_handle: &str, rekey: &KeyRekey) -> Result<()> {
let sig_payload = signed_keyrekey_payload(
&rekey.channel,
&rekey.pubkey,
&rekey.eph_pub,
&rekey.wrap_nonce,
&rekey.wrap_ct,
&rekey.nonce,
);
sig::verify(&rekey.pubkey, &sig_payload, &rekey.sig)?;
let new_fp = fingerprint(&rekey.pubkey);
let change = self.classify_peer_change(&new_fp, sender_handle)?;
match &change {
TrustChange::New => {
return Err(E2eError::Handshake(
"REKEY from unknown peer; ignoring".into(),
));
}
TrustChange::Known => {}
TrustChange::HandleChanged { .. }
| TrustChange::FingerprintChanged { .. }
| TrustChange::Revoked { .. } => {
let err = handle_mismatch_for(&change, sender_handle);
let new_pubkey = matches!(change, TrustChange::FingerprintChanged { .. })
.then_some(rekey.pubkey);
self.record_trust_change(PendingTrustNotice {
handle: sender_handle.to_string(),
channel: rekey.channel.clone(),
change,
new_pubkey,
});
return Err(err);
}
}
let my_seed = self.identity.secret_bytes();
let my_x25519_scalar = ecdh::ed25519_seed_to_x25519(&my_seed);
let my_sk = StaticSecret::from(my_x25519_scalar);
let shared = my_sk.diffie_hellman(&XPub::from(rekey.eph_pub));
let info = rekey_info(&rekey.channel);
let hk = Hkdf::<Sha256>::new(Some(b"RPE2E01-WRAP"), shared.as_bytes());
let mut wrap_key = [0u8; 32];
hk.expand(info.as_bytes(), &mut wrap_key)
.expect("hkdf expand 32 bytes never fails");
let new_sk_bytes = aead::decrypt(
&wrap_key,
&rekey.wrap_nonce,
info.as_bytes(),
&rekey.wrap_ct,
)?;
if new_sk_bytes.len() != 32 {
return Err(E2eError::Crypto(format!(
"rekey sk has unexpected length {}",
new_sk_bytes.len()
)));
}
let mut new_sk = [0u8; 32];
new_sk.copy_from_slice(&new_sk_bytes);
let sess = IncomingSession {
handle: sender_handle.to_string(),
channel: rekey.channel.clone(),
fingerprint: new_fp,
sk: new_sk,
status: TrustStatus::Trusted,
created_at: now_unix(),
};
self.keyring.install_incoming_session_strict(&sess)?;
Ok(())
}
pub fn decrypt_incoming(
&self,
sender_handle: &str,
channel: &str,
wire_line: &str,
) -> Result<DecryptOutcome> {
let Some(wire) = WireChunk::parse(wire_line)? else {
return Ok(DecryptOutcome::Plaintext(wire_line.to_string()));
};
let now = now_unix();
let skew = (now - wire.ts).abs();
if skew > self.ts_tolerance_secs {
return Ok(DecryptOutcome::Rejected(format!(
"ts outside tolerance window ({skew}s skew)"
)));
}
let Some(sess) = self.keyring.get_incoming_session(sender_handle, channel)? else {
return Ok(DecryptOutcome::MissingKey {
handle: sender_handle.to_string(),
channel: channel.to_string(),
});
};
if sess.status != TrustStatus::Trusted {
return Ok(DecryptOutcome::Rejected(format!(
"peer not trusted (status={:?})",
sess.status
)));
}
let aad = build_aad(channel, wire.msgid, wire.ts, wire.part, wire.total);
match aead::decrypt(&sess.sk, &wire.nonce, &aad, &wire.ciphertext) {
Ok(pt) => match String::from_utf8(pt) {
Ok(s) => Ok(DecryptOutcome::Plaintext(s)),
Err(e) => Ok(DecryptOutcome::Rejected(format!("utf8: {e}"))),
},
Err(e) => Ok(DecryptOutcome::Rejected(format!("aead failed: {e}"))),
}
}
pub fn build_keyreq(&self, channel: &str) -> Result<KeyReq> {
self.build_keyreq_for_peer(channel, None)
}
pub fn build_keyreq_for_peer(
&self,
channel: &str,
peer_handle: Option<&str>,
) -> Result<KeyReq> {
let mut nonce = [0u8; 16];
rand::fill(&mut nonce);
let mut eph_secret = [0u8; 32];
rand::fill(&mut eph_secret);
let eph_pub = {
let sec = StaticSecret::from(eph_secret);
XPub::from(&sec).to_bytes()
};
let pubkey = self.identity.public_bytes();
let sig_payload = signed_keyreq_payload(channel, &pubkey, &eph_pub, &nonce);
let sig_bytes = sig::sign(self.identity.signing_key(), &sig_payload);
self.pending
.lock()
.expect("e2e pending mutex poisoned")
.insert(
(channel.to_string(), nonce),
PendingHandshake {
channel: channel.to_string(),
peer_handle: peer_handle.map(ToOwned::to_owned),
eph_x25519_secret: eph_secret,
},
);
Ok(KeyReq {
channel: channel.to_string(),
pubkey,
eph_x25519: eph_pub,
nonce,
sig: sig_bytes,
})
}
#[must_use]
pub fn has_pending_keyreq(&self, channel: &str) -> bool {
self.pending
.lock()
.expect("e2e pending mutex poisoned")
.keys()
.any(|(pending_channel, _)| pending_channel == channel)
}
#[must_use]
#[allow(
clippy::unused_self,
reason = "method form is kept for symmetry with the rest of the public API; \
future versions may bind per-instance state (e.g. NOTICE ID counters)"
)]
pub fn encode_keyreq_ctcp(&self, req: &KeyReq) -> String {
format!("\x01{}\x01", encode_keyreq(req))
}
pub fn allow_keyreq(&self, peer_handle: &str) -> bool {
self.rate_limiter
.lock()
.expect("rate limiter mutex poisoned")
.allow_outgoing(peer_handle)
}
pub fn allow_incoming_keyreq(&self, peer_handle: &str) -> bool {
self.rate_limiter
.lock()
.expect("rate limiter mutex poisoned")
.allow_incoming(peer_handle)
}
fn effective_channel_mode(
&self,
sender_handle: &str,
channel: &str,
) -> Result<Option<ChannelMode>> {
let Some(ch_cfg) = self.keyring.get_channel_config(channel)? else {
return Ok(None);
};
if !ch_cfg.enabled {
return Ok(None);
}
if self.keyring.autotrust_matches(sender_handle, channel)? {
return Ok(Some(ChannelMode::AutoAccept));
}
Ok(Some(ch_cfg.mode))
}
fn tofu_upsert_on_keyreq(
&self,
sender_handle: &str,
sender_nick: Option<&str>,
req: &KeyReq,
fp: &Fingerprint,
) -> Result<()> {
let now = now_unix();
let change = self.classify_peer_change(fp, sender_handle)?;
match change {
TrustChange::New | TrustChange::Known => {
let global_status = if matches!(change, TrustChange::Known) {
self.keyring
.get_peer_by_fingerprint(fp)?
.map_or(TrustStatus::Pending, |p| p.global_status)
} else {
TrustStatus::Pending
};
let peer_rec = PeerRecord {
fingerprint: *fp,
pubkey: req.pubkey,
last_handle: Some(sender_handle.to_string()),
last_nick: sender_nick.map(ToOwned::to_owned),
first_seen: now,
last_seen: now,
global_status,
};
self.keyring.upsert_peer(&peer_rec)?;
Ok(())
}
TrustChange::HandleChanged { .. }
| TrustChange::FingerprintChanged { .. }
| TrustChange::Revoked { .. } => {
let err_msg = handle_mismatch_for(&change, sender_handle);
let new_pubkey =
matches!(change, TrustChange::FingerprintChanged { .. }).then_some(req.pubkey);
self.record_trust_change(PendingTrustNotice {
handle: sender_handle.to_string(),
channel: req.channel.clone(),
change,
new_pubkey,
});
Err(err_msg)
}
}
}
fn cache_pending_inbound_normal_mode(
&self,
sender_handle: &str,
sender_nick: Option<&str>,
req: &KeyReq,
fp: &Fingerprint,
) {
let pending_sess = IncomingSession {
handle: sender_handle.to_string(),
channel: req.channel.clone(),
fingerprint: *fp,
sk: [0u8; 32],
status: TrustStatus::Pending,
created_at: now_unix(),
};
if let Err(e) = self.keyring.install_incoming_session_strict(&pending_sess) {
tracing::warn!("normal-mode pending session install failed: {e}");
}
self.pending_inbound
.lock()
.expect("e2e pending inbound mutex poisoned")
.insert(
(sender_handle.to_string(), req.channel.clone()),
PendingInboundKeyReq { req: req.clone() },
);
self.pending_accept_requests
.lock()
.expect("e2e pending accept mutex poisoned")
.push(PendingAcceptRequest {
nick: sender_nick.map(ToOwned::to_owned),
handle: sender_handle.to_string(),
channel: req.channel.clone(),
});
}
pub fn handle_keyreq(&self, sender_handle: &str, req: &KeyReq) -> Result<Option<KeyRsp>> {
self.handle_keyreq_with_nick(sender_handle, None, req)
}
pub fn handle_keyreq_with_nick(
&self,
sender_handle: &str,
sender_nick: Option<&str>,
req: &KeyReq,
) -> Result<Option<KeyRsp>> {
if !self
.rate_limiter
.lock()
.expect("rate limiter mutex poisoned")
.allow_incoming(sender_handle)
{
return Err(E2eError::RateLimit(sender_handle.to_string()));
}
let sig_payload =
signed_keyreq_payload(&req.channel, &req.pubkey, &req.eph_x25519, &req.nonce);
sig::verify(&req.pubkey, &sig_payload, &req.sig)?;
let Some(effective_mode) = self.effective_channel_mode(sender_handle, &req.channel)? else {
return Ok(None);
};
let fp = fingerprint(&req.pubkey);
self.tofu_upsert_on_keyreq(sender_handle, sender_nick, req, &fp)?;
let already_trusted = self
.keyring
.get_incoming_session(sender_handle, &req.channel)?
.is_some_and(|s| s.status == TrustStatus::Trusted);
let allow = match effective_mode {
ChannelMode::AutoAccept => true,
ChannelMode::Normal => {
if already_trusted {
true
} else {
self.cache_pending_inbound_normal_mode(sender_handle, sender_nick, req, &fp);
return Ok(None);
}
}
ChannelMode::Quiet => already_trusted,
};
if !allow {
return Ok(None);
}
let our_sk = self.get_or_generate_outgoing_key(&req.channel)?;
self.keyring.record_outgoing_recipient(
&req.channel,
sender_handle,
&fingerprint(&req.pubkey),
now_unix(),
)?;
let mut our_eph_secret = [0u8; 32];
rand::fill(&mut our_eph_secret);
let our_eph_sec = StaticSecret::from(our_eph_secret);
let our_eph_pub = XPub::from(&our_eph_sec).to_bytes();
let info = wrap_info(&req.channel);
let wrap_key = derive_wrap_key(&our_eph_sec, &req.eph_x25519, info.as_bytes());
let (wrap_nonce, wrap_ct) = aead::encrypt(&wrap_key, info.as_bytes(), &our_sk)?;
let our_pubkey = self.identity.public_bytes();
let mut rsp_nonce = [0u8; 16];
rand::fill(&mut rsp_nonce);
let sig_payload = signed_keyrsp_payload(
&req.channel,
&our_pubkey,
&our_eph_pub,
&wrap_nonce,
&wrap_ct,
&rsp_nonce,
);
let sig_bytes = sig::sign(self.identity.signing_key(), &sig_payload);
let already_incoming = self
.keyring
.get_incoming_session(sender_handle, &req.channel)?
.is_some_and(|s| s.status == TrustStatus::Trusted);
let allow_out = self
.rate_limiter
.lock()
.expect("rate limiter mutex poisoned")
.allow_outgoing(sender_handle);
if !already_incoming && allow_out {
let reciprocal = self.build_keyreq_for_peer(&req.channel, Some(sender_handle))?;
self.pending_outbound_keyreqs
.lock()
.expect("e2e pending outbound keyreqs mutex poisoned")
.push(PendingOutboundKeyReq {
peer_handle: sender_handle.to_string(),
channel: req.channel.clone(),
req: reciprocal,
});
}
Ok(Some(KeyRsp {
channel: req.channel.clone(),
pubkey: our_pubkey,
ephemeral_pub: our_eph_pub,
wrap_nonce,
wrap_ct,
nonce: rsp_nonce,
sig: sig_bytes,
}))
}
pub fn accept_pending_inbound(
&self,
sender_handle: &str,
channel: &str,
) -> Result<Option<KeyRsp>> {
let cached = {
let mut guard = self
.pending_inbound
.lock()
.expect("e2e pending inbound mutex poisoned");
guard.remove(&(sender_handle.to_string(), channel.to_string()))
};
let Some(PendingInboundKeyReq { req }) = cached else {
return Ok(None);
};
self.build_keyrsp_for_accepted_request(sender_handle, &req)
.map(Some)
}
fn build_keyrsp_for_accepted_request(
&self,
sender_handle: &str,
req: &KeyReq,
) -> Result<KeyRsp> {
let fp = fingerprint(&req.pubkey);
let now = now_unix();
let existing_peer = self.keyring.get_peer_by_fingerprint(&fp)?;
let peer_rec = PeerRecord {
fingerprint: fp,
pubkey: req.pubkey,
last_handle: Some(sender_handle.to_string()),
last_nick: existing_peer.as_ref().and_then(|p| p.last_nick.clone()),
first_seen: existing_peer.as_ref().map_or(now, |p| p.first_seen),
last_seen: now,
global_status: TrustStatus::Trusted,
};
self.keyring.upsert_peer(&peer_rec)?;
let our_sk = self.get_or_generate_outgoing_key(&req.channel)?;
self.keyring
.record_outgoing_recipient(&req.channel, sender_handle, &fp, now_unix())?;
let mut our_eph_secret = [0u8; 32];
rand::fill(&mut our_eph_secret);
let our_eph_sec = StaticSecret::from(our_eph_secret);
let our_eph_pub = XPub::from(&our_eph_sec).to_bytes();
let info = wrap_info(&req.channel);
let wrap_key = derive_wrap_key(&our_eph_sec, &req.eph_x25519, info.as_bytes());
let (wrap_nonce, wrap_ct) = aead::encrypt(&wrap_key, info.as_bytes(), &our_sk)?;
let our_pubkey = self.identity.public_bytes();
let mut rsp_nonce = [0u8; 16];
rand::fill(&mut rsp_nonce);
let sig_payload = signed_keyrsp_payload(
&req.channel,
&our_pubkey,
&our_eph_pub,
&wrap_nonce,
&wrap_ct,
&rsp_nonce,
);
let sig_bytes = sig::sign(self.identity.signing_key(), &sig_payload);
let already_incoming = self
.keyring
.get_incoming_session(sender_handle, &req.channel)?
.is_some_and(|s| s.status == TrustStatus::Trusted);
let allow_out = self
.rate_limiter
.lock()
.expect("rate limiter mutex poisoned")
.allow_outgoing(sender_handle);
if !already_incoming && allow_out {
let reciprocal = self.build_keyreq_for_peer(&req.channel, Some(sender_handle))?;
self.pending_outbound_keyreqs
.lock()
.expect("e2e pending outbound keyreqs mutex poisoned")
.push(PendingOutboundKeyReq {
peer_handle: sender_handle.to_string(),
channel: req.channel.clone(),
req: reciprocal,
});
}
Ok(KeyRsp {
channel: req.channel.clone(),
pubkey: our_pubkey,
ephemeral_pub: our_eph_pub,
wrap_nonce,
wrap_ct,
nonce: rsp_nonce,
sig: sig_bytes,
})
}
#[must_use]
#[allow(
clippy::unused_self,
reason = "method form is kept for symmetry with the rest of the public API"
)]
pub fn encode_keyrsp_ctcp(&self, rsp: &KeyRsp) -> String {
format!("\x01{}\x01", encode_keyrsp(rsp))
}
#[must_use]
#[allow(
clippy::unused_self,
reason = "method form is kept for symmetry with the rest of the public API"
)]
pub fn encode_keyrekey_ctcp(&self, rk: &KeyRekey) -> String {
format!("\x01{}\x01", encode_keyrekey(rk))
}
fn consume_matching_pending_for_keyrsp(&self, rsp: &KeyRsp) -> Result<[u8; 32]> {
let info = wrap_info(&rsp.channel);
let candidate_keys: Vec<(String, [u8; 16])> = {
let pending = self.pending.lock().expect("e2e pending mutex poisoned");
pending
.iter()
.filter(|(k, _)| k.0 == rsp.channel)
.map(|(k, _)| k.clone())
.collect()
};
if candidate_keys.is_empty() {
return Err(E2eError::Handshake(
"no pending handshake for channel".into(),
));
}
let matched = candidate_keys.iter().find_map(|key| {
let eph_secret = {
let pending = self.pending.lock().expect("e2e pending mutex poisoned");
pending.get(key).map(|ph| ph.eph_x25519_secret)?
};
let our_sec = StaticSecret::from(eph_secret);
let wrap_key = derive_wrap_key(&our_sec, &rsp.ephemeral_pub, info.as_bytes());
let sk_bytes =
aead::decrypt(&wrap_key, &rsp.wrap_nonce, info.as_bytes(), &rsp.wrap_ct).ok()?;
if sk_bytes.len() != 32 {
return None;
}
let mut sk_arr = [0u8; 32];
sk_arr.copy_from_slice(&sk_bytes);
Some((sk_arr, key.clone()))
});
let Some((sk, matched_key)) = matched else {
return Err(E2eError::Crypto(
"KEYRSP did not match any pending handshake for channel".into(),
));
};
{
let mut pending = self.pending.lock().expect("e2e pending mutex poisoned");
pending.remove(&matched_key);
}
Ok(sk)
}
pub fn handle_keyrsp(&self, sender_handle: &str, rsp: &KeyRsp) -> Result<()> {
let sender_pubkey = rsp.pubkey;
let sig_payload = signed_keyrsp_payload(
&rsp.channel,
&sender_pubkey,
&rsp.ephemeral_pub,
&rsp.wrap_nonce,
&rsp.wrap_ct,
&rsp.nonce,
);
sig::verify(&sender_pubkey, &sig_payload, &rsp.sig)?;
let sk = self.consume_matching_pending_for_keyrsp(rsp)?;
let fp = fingerprint(&sender_pubkey);
let now = now_unix();
let change = self.classify_peer_change(&fp, sender_handle)?;
match change {
TrustChange::New | TrustChange::Known => {
self.keyring.upsert_peer(&PeerRecord {
fingerprint: fp,
pubkey: sender_pubkey,
last_handle: Some(sender_handle.to_string()),
last_nick: None,
first_seen: now,
last_seen: now,
global_status: TrustStatus::Trusted,
})?;
}
TrustChange::HandleChanged { .. }
| TrustChange::FingerprintChanged { .. }
| TrustChange::Revoked { .. } => {
let err = handle_mismatch_for(&change, sender_handle);
let new_pubkey = matches!(change, TrustChange::FingerprintChanged { .. })
.then_some(sender_pubkey);
self.record_trust_change(PendingTrustNotice {
handle: sender_handle.to_string(),
channel: rsp.channel.clone(),
change,
new_pubkey,
});
return Err(err);
}
}
let sess = IncomingSession {
handle: sender_handle.to_string(),
channel: rsp.channel.clone(),
fingerprint: fp,
sk,
status: TrustStatus::Trusted,
created_at: now,
};
if let Err(e) = self.keyring.install_incoming_session_strict(&sess) {
if matches!(e, E2eError::HandleMismatch { .. }) {
let existing_fp = self
.keyring
.get_incoming_session(sender_handle, &rsp.channel)?
.map(|s| s.fingerprint)
.unwrap_or_default();
self.record_trust_change(PendingTrustNotice {
handle: sender_handle.to_string(),
channel: rsp.channel.clone(),
change: TrustChange::FingerprintChanged {
handle: sender_handle.to_string(),
old_fp: existing_fp,
new_fp: fp,
},
new_pubkey: Some(sender_pubkey),
});
}
return Err(e);
}
Ok(())
}
}
fn handle_mismatch_for(change: &TrustChange, got_handle: &str) -> E2eError {
let expected = match change {
TrustChange::HandleChanged { old_handle, .. } => old_handle.clone(),
TrustChange::FingerprintChanged { handle, old_fp, .. } => {
format!("{handle} (fp={})", hex::encode(old_fp))
}
TrustChange::Revoked {
handle,
fingerprint,
} => {
format!("{handle} (revoked, fp={})", hex::encode(fingerprint))
}
TrustChange::New | TrustChange::Known => String::new(),
};
E2eError::HandleMismatch {
expected,
got: got_handle.to_string(),
}
}
fn wrap_info(channel: &str) -> String {
format!("RPE2E01-WRAP:{channel}")
}
fn rekey_info(channel: &str) -> String {
format!("RPE2E01-REKEY:{channel}")
}
fn derive_wrap_key(secret: &StaticSecret, peer_pub_bytes: &[u8; 32], info: &[u8]) -> [u8; 32] {
let peer_pub = XPub::from(*peer_pub_bytes);
let shared = secret.diffie_hellman(&peer_pub);
let hk = Hkdf::<Sha256>::new(Some(b"RPE2E01-WRAP"), shared.as_bytes());
let mut okm = [0u8; 32];
hk.expand(info, &mut okm)
.expect("hkdf expand 32 bytes never fails for OKM ≤ 255 * HashLen");
okm
}
fn now_unix() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_or(0, |d| i64::try_from(d.as_secs()).unwrap_or(i64::MAX))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::e2e::keyring::Keyring;
use rusqlite::Connection;
use std::sync::{Arc, Mutex as StdMutex};
const SCHEMA: &str = "
CREATE TABLE e2e_identity (id INTEGER PRIMARY KEY CHECK (id = 1), pubkey BLOB NOT NULL, privkey BLOB NOT NULL, fingerprint BLOB NOT NULL, created_at INTEGER NOT NULL);
CREATE TABLE e2e_peers (fingerprint BLOB PRIMARY KEY, pubkey BLOB NOT NULL, last_handle TEXT, last_nick TEXT, first_seen INTEGER NOT NULL, last_seen INTEGER NOT NULL, global_status TEXT NOT NULL DEFAULT 'pending');
CREATE TABLE e2e_outgoing_sessions (channel TEXT PRIMARY KEY, sk BLOB NOT NULL, created_at INTEGER NOT NULL, pending_rotation INTEGER NOT NULL DEFAULT 0);
CREATE TABLE e2e_incoming_sessions (handle TEXT NOT NULL, channel TEXT NOT NULL, fingerprint BLOB NOT NULL, sk BLOB NOT NULL, status TEXT NOT NULL DEFAULT 'pending', created_at INTEGER NOT NULL, PRIMARY KEY (handle, channel));
CREATE TABLE e2e_channel_config (channel TEXT PRIMARY KEY, enabled INTEGER NOT NULL DEFAULT 0, mode TEXT NOT NULL DEFAULT 'normal');
CREATE TABLE e2e_autotrust (id INTEGER PRIMARY KEY AUTOINCREMENT, scope TEXT NOT NULL, handle_pattern TEXT NOT NULL, created_at INTEGER NOT NULL, UNIQUE(scope, handle_pattern));
CREATE TABLE e2e_outgoing_recipients (channel TEXT NOT NULL, handle TEXT NOT NULL, fingerprint BLOB NOT NULL, first_sent_at INTEGER NOT NULL, PRIMARY KEY (channel, handle));
";
fn make_manager() -> E2eManager {
let conn = Connection::open_in_memory().unwrap();
conn.execute_batch(SCHEMA).unwrap();
let kr = Keyring::new(Arc::new(StdMutex::new(conn)));
E2eManager::load_or_init(kr).unwrap()
}
#[test]
fn load_or_init_persists_identity() {
let conn = Connection::open_in_memory().unwrap();
conn.execute_batch(SCHEMA).unwrap();
let shared = Arc::new(StdMutex::new(conn));
let kr1 = Keyring::new(shared.clone());
let m1 = E2eManager::load_or_init(kr1).unwrap();
let pk1 = m1.identity_pub();
let kr2 = Keyring::new(shared);
let m2 = E2eManager::load_or_init(kr2).unwrap();
let pk2 = m2.identity_pub();
assert_eq!(pk1, pk2, "identity must persist across loads");
}
#[test]
fn build_keyreq_stores_pending_and_is_signed() {
let mgr = make_manager();
let req = mgr.build_keyreq("#x").unwrap();
let payload = signed_keyreq_payload("#x", &req.pubkey, &req.eph_x25519, &req.nonce);
sig::verify(&req.pubkey, &payload, &req.sig).unwrap();
assert_eq!(
mgr.pending
.lock()
.expect("pending mutex poisoned in test")
.len(),
1
);
}
#[test]
fn build_keyreq_allows_multiple_in_flight_per_channel() {
let mgr = make_manager();
let req1 = mgr.build_keyreq("#x").unwrap();
let req2 = mgr.build_keyreq("#x").unwrap();
assert_ne!(req1.nonce, req2.nonce, "each build should fresh-nonce");
assert!(mgr.has_pending_keyreq("#x"));
}
#[test]
fn encrypt_decrypt_requires_session() {
let mgr = make_manager();
let wire = mgr.encrypt_outgoing("#x", "hi").unwrap().remove(0);
let outcome = mgr.decrypt_incoming("~alice@host", "#x", &wire).unwrap();
match outcome {
DecryptOutcome::MissingKey { .. } => {}
other => panic!("expected MissingKey, got {other:?}"),
}
}
#[test]
fn forget_peer_everywhere_clears_handle_scoped_pending_state() {
let mgr = make_manager();
let _ = mgr.build_keyreq_for_peer("#x", Some("~bob@host")).unwrap();
mgr.pending_inbound
.lock()
.expect("pending inbound mutex poisoned in test")
.insert(
("~bob@host".to_string(), "#x".to_string()),
PendingInboundKeyReq {
req: mgr.build_keyreq("#x").unwrap(),
},
);
mgr.pending_accept_requests
.lock()
.expect("pending accept mutex poisoned in test")
.push(PendingAcceptRequest {
nick: Some("bob".to_string()),
handle: "~bob@host".to_string(),
channel: "#x".to_string(),
});
mgr.pending_outbound_keyreqs
.lock()
.expect("pending outbound mutex poisoned in test")
.push(PendingOutboundKeyReq {
peer_handle: "~bob@host".to_string(),
channel: "#x".to_string(),
req: mgr.build_keyreq("#x").unwrap(),
});
let deleted = mgr.forget_peer_everywhere("~bob@host").unwrap();
assert!(deleted >= 4, "expected multiple pending rows removed");
assert!(
mgr.pending
.lock()
.expect("pending mutex poisoned in test")
.values()
.all(|p| p.peer_handle.as_deref() != Some("~bob@host"))
);
assert!(
mgr.pending_inbound
.lock()
.expect("pending inbound mutex poisoned in test")
.is_empty()
);
assert!(mgr.take_pending_accept_requests().is_empty());
assert!(mgr.take_pending_outbound_keyreqs().is_empty());
}
}