use phantom_protocol::crypto::adaptive_crypto::{CipherSuite, CryptoSession};
use phantom_protocol::security::ReplayWindow;
use phantom_protocol::transport::session::{CryptoState, Session, MAX_REKEY_CATCHUP};
use phantom_protocol::transport::types::{
PacketFlags, PacketHeader, PhantomPacket, SchedulerMode, SessionId, WIRE_VERSION,
};
use proptest::prelude::*;
fn session_pair(secret: [u8; 32]) -> (Session, Session) {
let id = SessionId::from_bytes([0x5Au8; 32]);
(
Session::from_derived(
id,
CryptoState::new(&secret, false).expect("client crypto"),
SchedulerMode::LowLatency,
secret,
false,
),
Session::from_derived(
id,
CryptoState::new(&secret, true).expect("server crypto"),
SchedulerMode::LowLatency,
secret,
true,
),
)
}
proptest! {
#[test]
fn aead_round_trip(
secret in proptest::array::uniform32(any::<u8>()),
plaintext in proptest::collection::vec(any::<u8>(), 0..2048),
aad in proptest::collection::vec(any::<u8>(), 0..256),
) {
let a = CryptoSession::with_suite(&secret, CipherSuite::Aes256Gcm)
.expect("init send");
let b = CryptoSession::with_suite_peer(&secret, CipherSuite::Aes256Gcm)
.expect("init recv");
let ct = a.encrypt(&aad, &plaintext).expect("encrypt");
let pt = b.decrypt(&aad, &ct).expect("decrypt");
prop_assert_eq!(pt, plaintext);
}
#[test]
fn aead_aad_mismatch_rejects(
secret in proptest::array::uniform32(any::<u8>()),
plaintext in proptest::collection::vec(any::<u8>(), 0..2048),
aad in proptest::collection::vec(any::<u8>(), 1..256),
) {
let a = CryptoSession::with_suite(&secret, CipherSuite::Aes256Gcm)
.expect("init send");
let b = CryptoSession::with_suite_peer(&secret, CipherSuite::Aes256Gcm)
.expect("init recv");
let ct = a.encrypt(&aad, &plaintext).expect("encrypt");
let mut bad_aad = aad.clone();
bad_aad[0] ^= 0x01;
prop_assert!(b.decrypt(&bad_aad, &ct).is_err());
}
}
proptest! {
#[test]
fn replay_window_accepts_monotonic(
starts in proptest::collection::vec(0u32..1_000_000_u32, 1..32),
) {
let mut sorted = starts.clone();
sorted.sort_unstable();
sorted.dedup();
let mut w = ReplayWindow::new();
for seq in sorted {
prop_assert!(w.accept(seq), "monotonic seq {} must be accepted", seq);
}
}
#[test]
fn replay_window_rejects_duplicates(
base in 1024u32..(u32::MAX / 2),
offset in 1u32..1023,
) {
let mut w = ReplayWindow::new();
prop_assert!(w.accept(base));
let seq = base - offset; prop_assert!(w.accept(seq));
prop_assert!(!w.accept(seq));
}
}
proptest! {
#[test]
fn wire_round_trip_preserves_fields(
sid_bytes in proptest::array::uniform32(any::<u8>()),
stream_id in any::<u16>(),
sequence in any::<u32>(),
flags_bits in any::<u16>(),
epoch in any::<u8>(),
path_id in any::<u8>(),
payload in proptest::collection::vec(any::<u8>(), 0..4096),
) {
let header = PacketHeader::new(
SessionId::from_bytes(sid_bytes),
stream_id,
sequence,
PacketFlags::new(flags_bits),
)
.with_epoch(epoch)
.with_path_id(path_id);
let packet = PhantomPacket::new(header, payload.clone());
let buf = packet.to_wire();
let decoded =
PhantomPacket::from_wire(&buf).expect("round-trip decode must succeed");
prop_assert_eq!(decoded.header.version, WIRE_VERSION);
prop_assert_eq!(decoded.header.stream_id, stream_id);
prop_assert_eq!(decoded.header.sequence, sequence);
prop_assert_eq!(decoded.header.flags.0, flags_bits);
prop_assert_eq!(decoded.header.epoch, epoch);
prop_assert_eq!(decoded.header.path_id, path_id);
prop_assert_eq!(decoded.payload, payload);
}
#[test]
fn wire_round_trip_preserves_payload_and_extensions(
sequence in any::<u32>(),
payload in proptest::collection::vec(any::<u8>(), 0..1024),
extensions in proptest::collection::vec(any::<u8>(), 0..512),
) {
let header = PacketHeader::new(
SessionId::from_bytes([0x11u8; 32]),
3,
sequence,
PacketFlags::new(PacketFlags::ENCRYPTED),
);
let mut packet = PhantomPacket::new(header, payload.clone());
packet.extensions = extensions.clone();
let buf = packet.to_wire();
let decoded = PhantomPacket::from_wire(&buf).expect("round-trip decode");
prop_assert_eq!(decoded.payload, payload);
prop_assert_eq!(decoded.extensions, extensions);
}
#[test]
fn from_wire_never_panics_on_arbitrary_bytes(
buf in proptest::collection::vec(any::<u8>(), 0..8192),
) {
let _ = PhantomPacket::from_wire(&buf);
}
}
proptest! {
#[test]
fn rekey_chains_stay_in_lockstep(
secret in proptest::array::uniform32(any::<u8>()),
n in 0u8..40,
) {
let (client, server) = session_pair(secret);
for _ in 0..n {
client.rekey().expect("client rekey");
server.rekey().expect("server rekey");
}
prop_assert_eq!(client.current_epoch(), n);
prop_assert_eq!(server.current_epoch(), n);
let header = PacketHeader::new(
*server.id(),
1,
1,
PacketFlags::new(PacketFlags::ENCRYPTED),
)
.with_epoch(n);
let ct = client.encrypt_packet(&header, b"chain").expect("encrypt");
let pt = server.decrypt_packet(&header, &ct).expect("decrypt at matched epoch");
prop_assert_eq!(pt, b"chain".to_vec());
}
#[test]
fn accepting_decrypt_follows_any_bounded_forward_step(
secret in proptest::array::uniform32(any::<u8>()),
steps in 1u8..=MAX_REKEY_CATCHUP,
) {
let (client, server) = session_pair(secret);
for _ in 0..steps {
client.rekey().expect("client rekey");
}
let header = PacketHeader::new(
*server.id(),
1,
1,
PacketFlags::new(PacketFlags::ENCRYPTED | PacketFlags::REKEY),
)
.with_epoch(steps);
let ct = client.encrypt_packet(&header, b"forward").expect("encrypt");
let pt = server
.decrypt_packet_accepting_rekey(&header, &ct)
.expect("accepting decrypt follows a bounded forward step");
prop_assert_eq!(pt, b"forward".to_vec());
prop_assert_eq!(server.current_epoch(), steps);
}
}
proptest! {
#[test]
fn no_nonce_repeats_across_forced_rekeys(
secret in any::<[u8; 32]>(),
watermark in 1u32..=64,
sends in 1u32..2000,
stream in any::<u16>(),
) {
let (client, _server) = session_pair(secret);
client.set_rekey_threshold(u64::MAX);
client.set_seq_rekey_watermark(watermark);
let mut spans: std::collections::BTreeMap<u8, (u32, u32)> = std::collections::BTreeMap::new();
let mut seen: std::collections::HashSet<(u8, u16, u32)> = std::collections::HashSet::new();
for seq in 0u32..sends {
if client.send_needs_rekey() || client.stream_seq_needs_rekey(stream, seq) {
if client.rekey().is_err() {
break; }
}
let epoch = client.current_epoch();
prop_assert!(
seen.insert((epoch, stream, seq)),
"nonce tuple (epoch={}, stream={}, seq={}) repeated",
epoch, stream, seq
);
let e = spans.entry(epoch).or_insert((seq, seq));
e.0 = e.0.min(seq);
e.1 = e.1.max(seq);
}
for (epoch, (lo, hi)) in &spans {
prop_assert!(
hi - lo <= watermark,
"epoch {} spans {} sequences > watermark {}",
epoch, hi - lo, watermark
);
}
}
}