use std::time::Instant;
use crate::NodeAddr;
use crate::config::SessionMmpConfig;
use crate::mmp::MmpSessionState;
use crate::node::REKEY_JITTER_SECS;
use crate::noise::{HandshakeState, NoiseSession};
use rand::RngExt;
use secp256k1::PublicKey;
fn draw_rekey_jitter() -> i64 {
rand::rng().random_range(-REKEY_JITTER_SECS..=REKEY_JITTER_SECS)
}
#[allow(clippy::large_enum_variant)]
pub(crate) enum EndToEndState {
Initiating(HandshakeState),
AwaitingMsg3(HandshakeState),
Established(NoiseSession),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum EpochSlot {
Current,
Pending,
Previous,
}
impl EndToEndState {
pub(crate) fn is_established(&self) -> bool {
matches!(self, EndToEndState::Established(_))
}
pub(crate) fn is_initiating(&self) -> bool {
matches!(self, EndToEndState::Initiating(_))
}
pub(crate) fn is_awaiting_msg3(&self) -> bool {
matches!(self, EndToEndState::AwaitingMsg3(_))
}
}
pub(crate) struct SessionEntry {
#[allow(dead_code)]
remote_addr: NodeAddr,
remote_pubkey: PublicKey,
state: Option<EndToEndState>,
#[cfg_attr(not(test), allow(dead_code))]
created_at: u64,
last_activity: u64,
last_inbound_frame_ms: u64,
session_start_ms: u64,
coords_warmup_remaining: u8,
is_initiator: bool,
mmp: Option<MmpSessionState>,
packets_sent: u64,
packets_recv: u64,
bytes_sent: u64,
bytes_recv: u64,
handshake_payload: Option<Vec<u8>>,
resend_count: u32,
next_resend_at_ms: u64,
current_k_bit: bool,
previous_noise_session: Option<NoiseSession>,
drain_started_ms: u64,
previous_last_used_ms: u64,
rekey_state: Option<HandshakeState>,
pending_new_session: Option<NoiseSession>,
rekey_initiator: bool,
last_peer_rekey_ms: u64,
rekey_completed_ms: u64,
rekey_msg3_payload: Option<Vec<u8>>,
rekey_msg3_next_resend_ms: u64,
rekey_msg3_resend_count: u32,
peer_new_epoch_confirmed: bool,
rekey_jitter_secs: i64,
consecutive_decrypt_failures: u32,
}
impl SessionEntry {
pub(crate) fn new(
remote_addr: NodeAddr,
remote_pubkey: PublicKey,
state: EndToEndState,
now_ms: u64,
is_initiator: bool,
) -> Self {
Self {
remote_addr,
remote_pubkey,
state: Some(state),
created_at: now_ms,
last_activity: now_ms,
last_inbound_frame_ms: now_ms,
session_start_ms: 0,
coords_warmup_remaining: 0,
is_initiator,
mmp: None,
packets_sent: 0,
packets_recv: 0,
bytes_sent: 0,
bytes_recv: 0,
handshake_payload: None,
resend_count: 0,
next_resend_at_ms: 0,
current_k_bit: false,
previous_noise_session: None,
drain_started_ms: 0,
previous_last_used_ms: 0,
rekey_state: None,
pending_new_session: None,
rekey_initiator: false,
last_peer_rekey_ms: 0,
rekey_completed_ms: 0,
rekey_msg3_payload: None,
rekey_msg3_next_resend_ms: 0,
rekey_msg3_resend_count: 0,
peer_new_epoch_confirmed: false,
rekey_jitter_secs: draw_rekey_jitter(),
consecutive_decrypt_failures: 0,
}
}
pub(crate) fn remote_pubkey(&self) -> &PublicKey {
&self.remote_pubkey
}
#[allow(dead_code)] pub(crate) fn state(&self) -> &EndToEndState {
self.state
.as_ref()
.expect("session state taken but not restored")
}
pub(crate) fn state_mut(&mut self) -> &mut EndToEndState {
self.state
.as_mut()
.expect("session state taken but not restored")
}
pub(crate) fn set_state(&mut self, state: EndToEndState) {
self.state = Some(state);
}
pub(crate) fn take_state(&mut self) -> Option<EndToEndState> {
self.state.take()
}
pub(crate) fn touch(&mut self, now_ms: u64) {
self.last_activity = now_ms;
}
pub(crate) fn touch_inbound_frame(&mut self, now_ms: u64) {
self.last_inbound_frame_ms = now_ms;
}
pub(crate) fn is_established(&self) -> bool {
self.state.as_ref().is_some_and(|s| s.is_established())
}
pub(crate) fn is_initiating(&self) -> bool {
self.state.as_ref().is_some_and(|s| s.is_initiating())
}
pub(crate) fn is_awaiting_msg3(&self) -> bool {
self.state.as_ref().is_some_and(|s| s.is_awaiting_msg3())
}
#[cfg(test)]
pub(crate) fn created_at(&self) -> u64 {
self.created_at
}
pub(crate) fn last_activity(&self) -> u64 {
self.last_activity
}
#[cfg(test)]
pub(crate) fn last_inbound_frame_ms(&self) -> u64 {
self.last_inbound_frame_ms
}
pub(crate) fn has_stale_outbound_only_activity(&self, now_ms: u64, timeout_ms: u64) -> bool {
self.packets_sent > 0 && now_ms.saturating_sub(self.last_inbound_frame_ms) > timeout_ms
}
pub(crate) fn coords_warmup_remaining(&self) -> u8 {
self.coords_warmup_remaining
}
pub(crate) fn set_coords_warmup_remaining(&mut self, value: u8) {
self.coords_warmup_remaining = value;
}
pub(crate) fn mark_established(&mut self, now_ms: u64) {
self.session_start_ms = now_ms;
}
pub(crate) fn session_timestamp(&self, now_ms: u64) -> u32 {
now_ms.wrapping_sub(self.session_start_ms) as u32
}
#[cfg_attr(not(test), allow(dead_code))]
pub(crate) fn is_initiator(&self) -> bool {
self.is_initiator
}
pub(crate) fn mmp(&self) -> Option<&MmpSessionState> {
self.mmp.as_ref()
}
pub(crate) fn mmp_mut(&mut self) -> Option<&mut MmpSessionState> {
self.mmp.as_mut()
}
pub(crate) fn init_mmp(&mut self, config: &SessionMmpConfig) {
self.mmp = Some(MmpSessionState::new(config, self.is_initiator));
}
pub(crate) fn record_sent(&mut self, bytes: usize) {
self.packets_sent += 1;
self.bytes_sent += bytes as u64;
}
pub(crate) fn record_recv(&mut self, bytes: usize) {
self.packets_recv += 1;
self.bytes_recv += bytes as u64;
}
pub(crate) fn traffic_counters(&self) -> (u64, u64, u64, u64) {
(
self.packets_sent,
self.packets_recv,
self.bytes_sent,
self.bytes_recv,
)
}
pub(crate) fn set_handshake_payload(&mut self, payload: Vec<u8>, next_resend_at_ms: u64) {
self.handshake_payload = Some(payload);
self.resend_count = 0;
self.next_resend_at_ms = next_resend_at_ms;
}
pub(crate) fn handshake_payload(&self) -> Option<&[u8]> {
self.handshake_payload.as_deref()
}
pub(crate) fn clear_handshake_payload(&mut self) {
self.handshake_payload = None;
self.next_resend_at_ms = 0;
}
pub(crate) fn resend_count(&self) -> u32 {
self.resend_count
}
pub(crate) fn next_resend_at_ms(&self) -> u64 {
self.next_resend_at_ms
}
pub(crate) fn record_resend(&mut self, next_resend_at_ms: u64) {
self.resend_count += 1;
self.next_resend_at_ms = next_resend_at_ms;
}
pub(crate) fn current_k_bit(&self) -> bool {
self.current_k_bit
}
pub(crate) fn has_rekey_in_progress(&self) -> bool {
self.rekey_state.is_some()
}
pub(crate) fn pending_new_session(&self) -> Option<&NoiseSession> {
self.pending_new_session.as_ref()
}
#[cfg(test)]
pub(crate) fn previous_noise_session_mut(&mut self) -> Option<&mut NoiseSession> {
self.previous_noise_session.as_mut()
}
pub(crate) fn current_noise_session_mut(&mut self) -> Option<&mut NoiseSession> {
match self.state.as_mut() {
Some(EndToEndState::Established(session)) => Some(session),
_ => None,
}
}
pub(crate) fn is_rekey_initiator(&self) -> bool {
self.rekey_initiator
}
pub(crate) fn is_rekey_dampened(&self, now_ms: u64, dampening_ms: u64) -> bool {
if self.last_peer_rekey_ms == 0 {
return false;
}
now_ms.saturating_sub(self.last_peer_rekey_ms) < dampening_ms
}
pub(crate) fn record_peer_rekey(&mut self, now_ms: u64) {
self.last_peer_rekey_ms = now_ms;
}
pub(crate) fn session_start_ms(&self) -> u64 {
self.session_start_ms
}
pub(crate) fn send_counter(&self) -> u64 {
match self.state.as_ref() {
Some(EndToEndState::Established(s)) => s.current_send_counter(),
_ => 0,
}
}
pub(crate) fn rekey_completed_ms(&self) -> u64 {
self.rekey_completed_ms
}
pub(crate) fn rekey_jitter_secs(&self) -> i64 {
self.rekey_jitter_secs
}
pub(crate) fn set_rekey_completed_ms(&mut self, ms: u64) {
self.rekey_completed_ms = ms;
}
pub(crate) fn set_rekey_msg3_payload(&mut self, payload: Vec<u8>, next_resend_at_ms: u64) {
self.rekey_msg3_payload = Some(payload);
self.rekey_msg3_next_resend_ms = next_resend_at_ms;
self.rekey_msg3_resend_count = 0;
self.peer_new_epoch_confirmed = false;
}
pub(crate) fn rekey_msg3_payload(&self) -> Option<&[u8]> {
self.rekey_msg3_payload.as_deref()
}
pub(crate) fn rekey_msg3_next_resend_ms(&self) -> u64 {
self.rekey_msg3_next_resend_ms
}
pub(crate) fn rekey_msg3_resend_count(&self) -> u32 {
self.rekey_msg3_resend_count
}
pub(crate) fn record_rekey_msg3_resend(&mut self, next_resend_at_ms: u64) {
self.rekey_msg3_resend_count += 1;
self.rekey_msg3_next_resend_ms = next_resend_at_ms;
}
pub(crate) fn clear_rekey_msg3_payload(&mut self) {
self.rekey_msg3_payload = None;
self.rekey_msg3_next_resend_ms = 0;
self.rekey_msg3_resend_count = 0;
}
#[cfg(test)]
pub(crate) fn peer_new_epoch_confirmed(&self) -> bool {
self.peer_new_epoch_confirmed
}
pub(crate) fn confirm_peer_new_epoch(&mut self) {
self.peer_new_epoch_confirmed = true;
self.clear_rekey_msg3_payload();
}
pub(crate) fn fsp_trial_decrypt(
&mut self,
ciphertext: &[u8],
counter: u64,
aad: &[u8],
received_k_bit: bool,
now_ms: u64,
) -> Option<(Vec<u8>, EpochSlot)> {
let pending_first =
received_k_bit != self.current_k_bit && self.pending_new_session.is_some();
let order = if pending_first {
[EpochSlot::Pending, EpochSlot::Current, EpochSlot::Previous]
} else {
[EpochSlot::Current, EpochSlot::Pending, EpochSlot::Previous]
};
for slot in order {
let session = match slot {
EpochSlot::Current => self.current_noise_session_mut(),
EpochSlot::Pending => self.pending_new_session.as_mut(),
EpochSlot::Previous => self.previous_noise_session.as_mut(),
};
if let Some(session) = session
&& let Ok(plaintext) =
session.decrypt_with_replay_check_and_aad(ciphertext, counter, aad)
{
if slot == EpochSlot::Previous {
self.refresh_previous_use(now_ms);
}
return Some((plaintext, slot));
}
}
None
}
pub(crate) fn set_pending_session(&mut self, session: NoiseSession) {
self.pending_new_session = Some(session);
self.rekey_state = None;
}
pub(crate) fn set_rekey_state(&mut self, state: HandshakeState, is_initiator: bool) {
self.rekey_state = Some(state);
self.rekey_initiator = is_initiator;
}
pub(crate) fn take_rekey_state(&mut self) -> Option<HandshakeState> {
self.rekey_state.take()
}
fn promote_pending(&mut self, now_ms: u64) -> bool {
let new_session = match self.pending_new_session.take() {
Some(s) => s,
None => return false,
};
if let Some(EndToEndState::Established(old)) = self.state.take() {
self.previous_noise_session = Some(old);
}
self.drain_started_ms = now_ms;
self.previous_last_used_ms = 0;
self.state = Some(EndToEndState::Established(new_session));
self.current_k_bit = !self.current_k_bit;
self.session_start_ms = now_ms;
self.rekey_state = None;
self.rekey_initiator = false;
self.rekey_completed_ms = 0;
self.rekey_jitter_secs = draw_rekey_jitter();
let now = Instant::now();
if let Some(mmp) = &mut self.mmp {
mmp.reset_for_rekey(now);
}
true
}
pub(crate) fn cutover_to_new_session(&mut self, now_ms: u64) -> bool {
self.promote_pending(now_ms)
}
pub(crate) fn handle_peer_kbit_flip(&mut self, now_ms: u64) -> bool {
self.promote_pending(now_ms)
}
pub(crate) fn drain_expired(&self, now_ms: u64, drain_ms: u64) -> bool {
if self.drain_started_ms == 0 {
return false;
}
let deadline_anchor = self.drain_started_ms.max(self.previous_last_used_ms);
now_ms.saturating_sub(deadline_anchor) >= drain_ms
}
pub(crate) fn is_draining(&self) -> bool {
self.drain_started_ms > 0
}
pub(crate) fn refresh_previous_use(&mut self, now_ms: u64) {
if self.drain_started_ms > 0 {
self.previous_last_used_ms = now_ms;
}
}
pub(crate) fn complete_drain(&mut self) {
self.previous_noise_session = None;
self.drain_started_ms = 0;
self.previous_last_used_ms = 0;
}
pub(crate) fn abandon_rekey(&mut self) {
self.rekey_state = None;
self.pending_new_session = None;
self.rekey_initiator = false;
self.rekey_completed_ms = 0;
self.clear_rekey_msg3_payload();
self.peer_new_epoch_confirmed = false;
}
pub(crate) fn record_decrypt_failure(&mut self) -> u32 {
self.consecutive_decrypt_failures = self.consecutive_decrypt_failures.saturating_add(1);
self.consecutive_decrypt_failures
}
pub(crate) fn reset_decrypt_failures(&mut self) {
self.consecutive_decrypt_failures = 0;
}
#[cfg(test)]
pub(crate) fn consecutive_decrypt_failures(&self) -> u32 {
self.consecutive_decrypt_failures
}
#[cfg(test)]
pub(crate) fn set_previous_session_for_test(&mut self, session: NoiseSession, now_ms: u64) {
self.previous_noise_session = Some(session);
self.drain_started_ms = now_ms;
}
#[cfg(test)]
pub(crate) fn previous_highest_counter(&self) -> Option<u64> {
self.previous_noise_session
.as_ref()
.map(|session| session.highest_received_counter())
}
#[cfg(test)]
pub(crate) fn pending_highest_counter(&self) -> Option<u64> {
self.pending_new_session
.as_ref()
.map(|session| session.highest_received_counter())
}
#[cfg(test)]
pub(crate) fn current_highest_counter(&self) -> Option<u64> {
match self.state.as_ref() {
Some(EndToEndState::Established(session)) => Some(session.highest_received_counter()),
_ => None,
}
}
}
#[cfg(test)]
mod overlapping_epoch_tests {
use super::*;
use crate::node::session_wire::{FSP_FLAG_K, build_fsp_header};
use secp256k1::{Keypair, Secp256k1, SecretKey};
fn keypair(seed: u8) -> Keypair {
let secp = Secp256k1::new();
let mut bytes = [1u8; 32];
bytes[0] = seed;
let sk = SecretKey::from_slice(&bytes).expect("valid secret key");
Keypair::from_secret_key(&secp, &sk)
}
fn xk_pair(init_seed: u8, resp_seed: u8) -> (NoiseSession, NoiseSession) {
let init_kp = keypair(init_seed);
let resp_kp = keypair(resp_seed);
let mut initiator = HandshakeState::new_xk_initiator(init_kp, resp_kp.public_key());
initiator.set_local_epoch([0xA1, 0xB2, 0xC3, 0xD4, 0x11, 0x22, 0x33, 0x44]);
let mut responder = HandshakeState::new_xk_responder(resp_kp);
responder.set_local_epoch([0xD4, 0xC3, 0xB2, 0xA1, 0x44, 0x33, 0x22, 0x11]);
let msg1 = initiator.write_xk_message_1().unwrap();
responder.read_xk_message_1(&msg1).unwrap();
let msg2 = responder.write_xk_message_2().unwrap();
initiator.read_xk_message_2(&msg2).unwrap();
let msg3 = initiator.write_xk_message_3().unwrap();
responder.read_xk_message_3(&msg3).unwrap();
(
initiator.into_session().unwrap(),
responder.into_session().unwrap(),
)
}
fn seal(sender: &mut NoiseSession, plaintext: &[u8], k_bit: bool) -> (Vec<u8>, u64, [u8; 12]) {
let counter = sender.current_send_counter();
let flags = if k_bit { FSP_FLAG_K } else { 0 };
let header = build_fsp_header(counter, flags, plaintext.len() as u16);
let ciphertext = sender.encrypt_with_aad(plaintext, &header).unwrap();
(ciphertext, counter, header)
}
fn entry_with_current(session: NoiseSession) -> SessionEntry {
let addr = NodeAddr::from_bytes([7u8; 16]);
let pubkey = keypair(99).public_key();
let mut entry = SessionEntry::new(
addr,
pubkey,
EndToEndState::Established(session),
1_000,
true,
);
entry.mark_established(1_000);
entry
}
#[test]
fn trial_decrypt_picks_current() {
let (mut cur_send, cur_recv) = xk_pair(1, 2);
let (_p_send, p_recv) = xk_pair(3, 4);
let (_o_send, o_recv) = xk_pair(5, 6);
let mut entry = entry_with_current(cur_recv);
entry.set_pending_session(p_recv);
entry.set_previous_session_for_test(o_recv, 1_000);
let (ct, counter, hdr) = seal(&mut cur_send, b"steady-state", false);
let (pt, slot) = entry
.fsp_trial_decrypt(&ct, counter, &hdr, false, 2_000)
.expect("current frame must decrypt");
assert_eq!(pt, b"steady-state");
assert_eq!(slot, EpochSlot::Current);
assert_eq!(entry.pending_highest_counter(), Some(0));
assert_eq!(entry.previous_highest_counter(), Some(0));
}
#[test]
fn trial_decrypt_picks_pending_and_promotes() {
let (_cur_send, cur_recv) = xk_pair(1, 2);
let (mut p_send, p_recv) = xk_pair(3, 4);
let mut entry = entry_with_current(cur_recv);
let k_before = entry.current_k_bit();
entry.set_pending_session(p_recv);
let (ct, counter, hdr) = seal(&mut p_send, b"new-epoch", !k_before);
let (pt, slot) = entry
.fsp_trial_decrypt(&ct, counter, &hdr, !k_before, 2_000)
.expect("pending frame must decrypt");
assert_eq!(pt, b"new-epoch");
assert_eq!(slot, EpochSlot::Pending);
entry.handle_peer_kbit_flip(2_000);
assert!(entry.pending_new_session().is_none());
assert!(entry.previous_highest_counter().is_some());
assert_ne!(entry.current_k_bit(), k_before);
}
#[test]
fn trial_decrypt_picks_previous_during_drain() {
let (mut old_send, old_recv) = xk_pair(1, 2);
let (_new_send, new_recv) = xk_pair(3, 4);
let mut entry = entry_with_current(new_recv);
entry.set_previous_session_for_test(old_recv, 1_500);
let k_after = entry.current_k_bit();
let (ct, counter, hdr) = seal(&mut old_send, b"old-straggler", !k_after);
let (pt, slot) = entry
.fsp_trial_decrypt(&ct, counter, &hdr, !k_after, 3_000)
.expect("previous frame must decrypt");
assert_eq!(pt, b"old-straggler");
assert_eq!(slot, EpochSlot::Previous);
assert_eq!(entry.current_k_bit(), k_after);
assert!(entry.is_draining());
}
#[test]
fn trial_decrypt_reordered_old_after_cutover() {
let (mut cur_send, cur_recv) = xk_pair(1, 2);
let (mut p_send, p_recv) = xk_pair(3, 4);
let mut entry = entry_with_current(cur_recv);
let k_before = entry.current_k_bit();
entry.set_pending_session(p_recv);
let (ct_new, c_new, hdr_new) = seal(&mut p_send, b"after-cutover", !k_before);
let (_pt, slot) = entry
.fsp_trial_decrypt(&ct_new, c_new, &hdr_new, !k_before, 2_000)
.unwrap();
assert_eq!(slot, EpochSlot::Pending);
entry.handle_peer_kbit_flip(2_000);
let (ct_old, c_old, hdr_old) = seal(&mut cur_send, b"reordered-old", k_before);
let (pt, slot) = entry
.fsp_trial_decrypt(&ct_old, c_old, &hdr_old, k_before, 2_500)
.expect("reordered old-epoch frame must still decrypt");
assert_eq!(pt, b"reordered-old");
assert_eq!(slot, EpochSlot::Previous);
}
#[test]
fn trial_decrypt_replay_is_per_slot() {
let (mut cur_send, cur_recv) = xk_pair(1, 2);
let (mut p_send, p_recv) = xk_pair(3, 4);
let mut entry = entry_with_current(cur_recv);
let k_before = entry.current_k_bit();
entry.set_pending_session(p_recv);
let (ct, counter, hdr) = seal(&mut cur_send, b"first", k_before);
let (_pt, slot) = entry
.fsp_trial_decrypt(&ct, counter, &hdr, k_before, 2_000)
.unwrap();
assert_eq!(slot, EpochSlot::Current);
assert!(
entry
.fsp_trial_decrypt(&ct, counter, &hdr, k_before, 2_100)
.is_none(),
"a genuine replay must be rejected by every slot"
);
assert_eq!(entry.pending_highest_counter(), Some(0));
let (ct_p, c_p, hdr_p) = seal(&mut p_send, b"pending-c0", !k_before);
assert_eq!(c_p, 0);
let (pt, slot) = entry
.fsp_trial_decrypt(&ct_p, c_p, &hdr_p, !k_before, 2_200)
.expect("pending frame must decrypt despite current replay overlap");
assert_eq!(pt, b"pending-c0");
assert_eq!(slot, EpochSlot::Pending);
}
#[test]
fn trial_decrypt_failed_slot_leaves_replay_window_intact() {
let (_cur_send, cur_recv) = xk_pair(1, 2);
let (mut p_send, p_recv) = xk_pair(3, 4);
let (_o_send, o_recv) = xk_pair(5, 6);
let mut entry = entry_with_current(cur_recv);
let k_before = entry.current_k_bit();
entry.set_pending_session(p_recv);
entry.set_previous_session_for_test(o_recv, 1_000);
for _ in 0..4 {
let _ = seal(&mut p_send, b"warmup", !k_before);
}
let (ct, counter, hdr) = seal(&mut p_send, b"pending-hit", !k_before);
assert_eq!(counter, 4);
let (_pt, slot) = entry
.fsp_trial_decrypt(&ct, counter, &hdr, false, 2_000)
.expect("pending frame must decrypt");
assert_eq!(slot, EpochSlot::Pending);
assert_eq!(entry.current_highest_counter(), Some(0));
assert_eq!(entry.previous_highest_counter(), Some(0));
assert_eq!(entry.pending_highest_counter(), Some(4));
}
#[test]
fn msg3_retransmit_stops_on_peer_new_epoch_confirmed() {
let (_cur_send, cur_recv) = xk_pair(1, 2);
let (mut p_send, p_recv) = xk_pair(3, 4);
let mut entry = entry_with_current(cur_recv);
entry.set_pending_session(p_recv);
entry.set_rekey_completed_ms(1_000);
entry.set_rekey_msg3_payload(vec![0xAB; 73], 1_500);
assert!(entry.cutover_to_new_session(2_000));
assert!(entry.rekey_msg3_payload().is_some());
assert!(!entry.peer_new_epoch_confirmed());
let k_now = entry.current_k_bit();
let (ct, counter, hdr) = seal(&mut p_send, b"peer-on-new-epoch", k_now);
let (_pt, slot) = entry
.fsp_trial_decrypt(&ct, counter, &hdr, k_now, 2_500)
.unwrap();
assert_eq!(slot, EpochSlot::Current);
assert!(entry.rekey_msg3_payload().is_some() && entry.pending_new_session().is_none());
entry.confirm_peer_new_epoch();
assert!(entry.peer_new_epoch_confirmed());
assert!(entry.rekey_msg3_payload().is_none());
}
#[test]
fn msg3_retransmit_budget_exhaustion_abandons_cleanly() {
let (_cur_send, cur_recv) = xk_pair(1, 2);
let (_p_send, p_recv) = xk_pair(3, 4);
let mut entry = entry_with_current(cur_recv);
entry.set_pending_session(p_recv);
entry.set_rekey_completed_ms(1_000);
entry.set_rekey_msg3_payload(vec![0xCD; 73], 1_500);
let max_resends = 8;
for i in 0..max_resends {
entry.record_rekey_msg3_resend(2_000 + i as u64 * 100);
}
assert_eq!(entry.rekey_msg3_resend_count(), max_resends);
entry.abandon_rekey();
assert!(entry.rekey_msg3_payload().is_none());
assert!(entry.pending_new_session().is_none());
assert!(!entry.has_rekey_in_progress());
assert!(entry.is_established());
assert!(!entry.peer_new_epoch_confirmed());
}
#[test]
fn initiator_cutover_keeps_responder_old_epoch_decryptable() {
let (old_a, old_b) = xk_pair(1, 2);
let (new_a, mut new_b) = xk_pair(3, 4);
let mut a = entry_with_current(old_a);
a.set_rekey_completed_ms(1_000);
a.set_rekey_msg3_payload(vec![0xEE; 73], 1_500);
a.set_pending_session(new_a);
assert!(a.cutover_to_new_session(2_000));
assert!(a.rekey_msg3_payload().is_some());
let mut b = entry_with_current(old_b);
let (ct_new, c_new, hdr_new) = seal(&mut new_b, b"new-from-a", true);
assert!(
b.fsp_trial_decrypt(&ct_new, c_new, &hdr_new, true, 2_100)
.is_none(),
"responder without msg3 drops the new-epoch frame cleanly"
);
let (ct_old, c_old, hdr_old) = {
let b_old = b.current_noise_session_mut().unwrap();
seal(b_old, b"old-from-b", false)
};
let (pt, slot) = a
.fsp_trial_decrypt(&ct_old, c_old, &hdr_old, false, 2_200)
.expect("initiator must still decrypt the responder's old-epoch frame");
assert_eq!(pt, b"old-from-b");
assert_eq!(slot, EpochSlot::Previous);
let (new_a2, mut new_b2) = xk_pair(3, 4);
b.set_pending_session(new_a2);
let (ct_new2, c_new2, hdr_new2) = seal(&mut new_b2, b"new-from-a-2", true);
let (pt, slot) = b
.fsp_trial_decrypt(&ct_new2, c_new2, &hdr_new2, true, 2_300)
.expect("responder must decrypt new-epoch frame once pending is installed");
assert_eq!(pt, b"new-from-a-2");
assert_eq!(slot, EpochSlot::Pending);
}
#[test]
fn drain_expiry_is_peer_progress_aware() {
const DRAIN_MS: u64 = 10_000;
let cutover_ms = 1_000;
let (mut old_send, old_recv) = xk_pair(1, 2);
let (_new_send, new_recv) = xk_pair(3, 4);
let mut entry = entry_with_current(old_recv);
entry.set_pending_session(new_recv);
assert!(entry.cutover_to_new_session(cutover_ms));
assert!(entry.is_draining());
let k_old = !entry.current_k_bit();
for &t in &[5_000u64, 15_000, 25_000] {
let (ct, counter, hdr) = seal(&mut old_send, b"still-old-epoch", k_old);
let (_pt, slot) = entry
.fsp_trial_decrypt(&ct, counter, &hdr, k_old, t)
.expect("old-epoch frame must still decrypt while peer uses it");
assert_eq!(slot, EpochSlot::Previous);
assert!(
!entry.drain_expired(t, DRAIN_MS),
"previous slot must not be retired while peer keeps using it"
);
assert!(entry.previous_highest_counter().is_some());
}
assert!(!entry.drain_expired(34_999, DRAIN_MS));
assert!(entry.drain_expired(35_000, DRAIN_MS));
entry.complete_drain();
assert!(entry.previous_highest_counter().is_none());
assert!(!entry.is_draining());
}
#[test]
fn drain_expiry_unaffected_when_peer_off_old_epoch() {
const DRAIN_MS: u64 = 10_000;
let cutover_ms = 1_000;
let (_old_send, old_recv) = xk_pair(1, 2);
let (_new_send, new_recv) = xk_pair(3, 4);
let mut entry = entry_with_current(old_recv);
entry.set_pending_session(new_recv);
assert!(entry.cutover_to_new_session(cutover_ms));
assert!(!entry.drain_expired(cutover_ms + DRAIN_MS - 1, DRAIN_MS));
assert!(entry.drain_expired(cutover_ms + DRAIN_MS, DRAIN_MS));
}
}