use std::io;
use std::net::{SocketAddr, UdpSocket};
use varta_vlp::crypto::{self, Key, NONCE_BYTES, SECURE_FRAME_MASTER_BYTES};
use crate::transport::{bind_ephemeral, BeatTransport};
const SECURE_FRAME_LEN: usize = crypto::SECURE_FRAME_BYTES;
const SECURE_FRAME_MASTER_LEN: usize = SECURE_FRAME_MASTER_BYTES;
pub struct SecureUdpTransport {
sock: UdpSocket,
addr: SocketAddr,
key: Key,
iv_counter: u32,
iv_session_salt: [u8; 16],
iv_prefix_index: u32,
iv_prefix: [u8; 8],
is_master_mode: bool,
}
impl SecureUdpTransport {
pub fn connect(addr: SocketAddr, key: Key) -> io::Result<Self> {
use varta_vlp::crypto::kdf;
let sock = bind_ephemeral(&addr)?;
sock.connect(addr)?;
sock.set_nonblocking(true)?;
let iv_session_salt = read_iv_session_salt()?;
let iv_prefix = kdf::derive_iv_prefix(&iv_session_salt, 0)
.map_err(|_| io::Error::new(io::ErrorKind::Other, "key derivation failure"))?;
Ok(SecureUdpTransport {
sock,
addr,
key,
iv_counter: 0,
iv_session_salt,
iv_prefix_index: 0,
iv_prefix,
is_master_mode: false,
})
}
pub fn connect_with_master(addr: SocketAddr, master_key: Key) -> io::Result<Self> {
use varta_vlp::crypto::kdf;
let peer_pid = std::process::id();
let agent_key = kdf::derive_agent_key(&master_key, peer_pid)
.map_err(|_| io::Error::new(io::ErrorKind::Other, "key derivation failure"))?;
let sock = bind_ephemeral(&addr)?;
sock.connect(addr)?;
sock.set_nonblocking(true)?;
let iv_session_salt = read_iv_session_salt()?;
let iv_prefix = kdf::derive_iv_prefix(&iv_session_salt, 0)
.map_err(|_| io::Error::new(io::ErrorKind::Other, "key derivation failure"))?;
Ok(SecureUdpTransport {
sock,
addr,
key: agent_key,
iv_counter: 0,
iv_session_salt,
iv_prefix_index: 0,
iv_prefix,
is_master_mode: true,
})
}
#[cfg(any(test, feature = "test-hooks"))]
pub fn set_iv_counter_for_test(&mut self, value: u32) {
self.iv_counter = value;
}
#[cfg(any(test, feature = "test-hooks"))]
pub fn iv_counter_for_test(&self) -> u32 {
self.iv_counter
}
#[cfg(any(test, feature = "test-hooks"))]
pub fn iv_prefix_for_test(&self) -> [u8; 8] {
self.iv_prefix
}
#[cfg(any(test, feature = "test-hooks"))]
pub fn iv_prefix_index_for_test(&self) -> u32 {
self.iv_prefix_index
}
#[cfg(any(test, feature = "test-hooks"))]
pub fn set_iv_prefix_index_for_test(&mut self, value: u32) {
self.iv_prefix_index = value;
}
fn advance_nonce(&mut self) -> io::Result<u32> {
if let Some(n) = self.iv_counter.checked_add(1) {
return Ok(n);
}
if let Some(next_index) = self.iv_prefix_index.checked_add(1) {
self.iv_prefix_index = next_index;
self.iv_prefix = varta_vlp::crypto::kdf::derive_iv_prefix(
&self.iv_session_salt,
self.iv_prefix_index,
)
.map_err(|_| io::Error::new(io::ErrorKind::Other, "key derivation failure"))?;
return Ok(1);
}
self.reconnect()?;
debug_assert_eq!(
self.iv_counter, 0,
"reconnect() must zero iv_counter — see secure_transport module docs"
);
debug_assert_eq!(
self.iv_prefix_index, 0,
"reconnect() must zero iv_prefix_index — see secure_transport module docs"
);
Ok(1)
}
}
impl BeatTransport for SecureUdpTransport {
fn send(&mut self, buf: &[u8; 32]) -> io::Result<usize> {
let pending_counter = self.advance_nonce()?;
let mut nonce = [0u8; NONCE_BYTES];
nonce[..8].copy_from_slice(&self.iv_prefix);
nonce[8..12].copy_from_slice(&pending_counter.to_le_bytes());
let result = if self.is_master_mode {
let agent_pid = std::process::id();
let agent_pid_bytes = agent_pid.to_le_bytes();
let (ciphertext, tag) =
crypto::seal(self.key.as_bytes(), &nonce, &agent_pid_bytes, buf)
.map_err(|_| io::Error::new(io::ErrorKind::Other, "AEAD seal failure"))?;
let mut frame = [0u8; SECURE_FRAME_MASTER_LEN];
frame[0..4].copy_from_slice(&agent_pid_bytes);
frame[4..12].copy_from_slice(&self.iv_prefix);
frame[12..16].copy_from_slice(&pending_counter.to_le_bytes());
frame[16..48].copy_from_slice(&ciphertext);
frame[48..64].copy_from_slice(&tag);
self.sock.send(&frame)
} else {
let (ciphertext, tag) = crypto::seal(self.key.as_bytes(), &nonce, b"", buf)
.map_err(|_| io::Error::new(io::ErrorKind::Other, "AEAD seal failure"))?;
let mut frame = [0u8; SECURE_FRAME_LEN];
frame[..8].copy_from_slice(&self.iv_prefix);
frame[8..12].copy_from_slice(&pending_counter.to_le_bytes());
frame[12..44].copy_from_slice(&ciphertext);
frame[44..60].copy_from_slice(&tag);
self.sock.send(&frame)
};
if result.is_ok() {
self.iv_counter = pending_counter;
}
result
}
fn reconnect(&mut self) -> io::Result<()> {
use varta_vlp::crypto::kdf;
let sock = bind_ephemeral(&self.addr)?;
sock.connect(self.addr)?;
sock.set_nonblocking(true)?;
let new_salt = read_iv_session_salt()?;
let new_prefix = kdf::derive_iv_prefix(&new_salt, 0)
.map_err(|_| io::Error::new(io::ErrorKind::Other, "key derivation failure"))?;
self.sock = sock;
self.iv_session_salt = new_salt;
self.iv_prefix = new_prefix;
self.iv_prefix_index = 0;
self.iv_counter = 0;
Ok(())
}
}
#[cfg(target_os = "linux")]
#[allow(unsafe_code)]
fn os_random(buf: &mut [u8]) -> io::Result<()> {
extern "C" {
fn getrandom(buf: *mut u8, buflen: usize, flags: u32) -> isize;
}
let mut filled = 0usize;
while filled < buf.len() {
let n = unsafe { getrandom(buf.as_mut_ptr().add(filled), buf.len() - filled, 0) };
if n < 0 {
let e = io::Error::last_os_error();
if e.kind() == io::ErrorKind::Interrupted {
continue;
}
return Err(e);
}
filled += n as usize;
}
Ok(())
}
#[cfg(any(
target_os = "macos",
target_os = "ios",
target_os = "freebsd",
target_os = "netbsd",
target_os = "openbsd",
target_os = "dragonfly",
))]
#[allow(unsafe_code)]
fn os_random(buf: &mut [u8]) -> io::Result<()> {
extern "C" {
fn getentropy(buf: *mut u8, buflen: usize) -> i32;
}
assert!(buf.len() <= 256, "getentropy: buflen must be <= 256");
let rc = unsafe { getentropy(buf.as_mut_ptr(), buf.len()) };
if rc == 0 {
Ok(())
} else {
Err(io::Error::last_os_error())
}
}
#[cfg(not(any(
target_os = "linux",
target_os = "macos",
target_os = "ios",
target_os = "freebsd",
target_os = "netbsd",
target_os = "openbsd",
target_os = "dragonfly",
)))]
fn os_random(_buf: &mut [u8]) -> io::Result<()> {
Err(io::Error::new(
io::ErrorKind::Unsupported,
"no OS random source on this platform",
))
}
#[cfg_attr(
not(any(test, all(feature = "panic-handler", feature = "secure-udp"))),
allow(dead_code)
)]
pub(crate) fn read_iv_random() -> io::Result<[u8; 8]> {
let mut buf = [0u8; 8];
if os_random(&mut buf).is_ok() {
return Ok(buf);
}
std::fs::File::open("/dev/urandom").and_then(|mut f| {
use std::io::Read;
f.read_exact(&mut buf)
})?;
Ok(buf)
}
pub(crate) fn read_iv_session_salt() -> io::Result<[u8; 16]> {
let mut buf = [0u8; 16];
if os_random(&mut buf).is_ok() {
return Ok(buf);
}
std::fs::File::open("/dev/urandom").and_then(|mut f| {
use std::io::Read;
f.read_exact(&mut buf)
})?;
Ok(buf)
}
#[cfg(any(feature = "accept-degraded-entropy", test))]
#[cfg_attr(not(any(test, feature = "accept-degraded-entropy")), allow(dead_code))]
pub(crate) fn fallback_iv_random() -> [u8; 8] {
use std::collections::hash_map::RandomState;
use std::hash::{BuildHasher, Hash, Hasher};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::OnceLock;
use std::time::{Instant, SystemTime, UNIX_EPOCH};
static SEQ: AtomicU64 = AtomicU64::new(0);
static START: OnceLock<Instant> = OnceLock::new();
let mut hasher = RandomState::new().build_hasher();
std::process::id().hash(&mut hasher);
std::thread::current().id().hash(&mut hasher);
SEQ.fetch_add(1, Ordering::Relaxed).hash(&mut hasher);
START
.get_or_init(Instant::now)
.elapsed()
.as_nanos()
.hash(&mut hasher);
if let Ok(d) = SystemTime::now().duration_since(UNIX_EPOCH) {
d.as_nanos().hash(&mut hasher);
}
hasher.finish().to_le_bytes()
}
#[cfg(any(feature = "accept-degraded-entropy", test))]
#[allow(dead_code)]
pub(crate) fn fallback_iv_session_salt() -> [u8; 16] {
let lo = fallback_iv_random();
let hi = fallback_iv_random();
let mut out = [0u8; 16];
out[..8].copy_from_slice(&lo);
out[8..].copy_from_slice(&hi);
out
}
#[cfg(test)]
mod tests {
use super::*;
use std::net::{Ipv6Addr, SocketAddrV6};
#[test]
fn ipv6_connect_does_not_fail_with_einval() {
let addr = SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::LOCALHOST, 9876, 0, 0));
let key = Key::from_bytes([0x42; 32]);
let result = SecureUdpTransport::connect(addr, key);
assert!(result.is_ok(), "IPv6 connect failed: {:?}", result.err());
}
#[test]
fn fallback_iv_random_unique_across_calls() {
use std::collections::HashSet;
let outputs: HashSet<[u8; 8]> = (0..1000).map(|_| fallback_iv_random()).collect();
assert_eq!(
outputs.len(),
1000,
"collisions detected in fallback_iv_random"
);
}
#[test]
fn os_random_yields_distinct_outputs() {
let mut a = [0u8; 32];
let mut b = [0u8; 32];
match (os_random(&mut a), os_random(&mut b)) {
(Ok(()), Ok(())) => assert_ne!(a, b, "os_random returned identical outputs"),
(Err(e), _) | (_, Err(e)) if e.kind() == io::ErrorKind::Unsupported => {}
(Err(e), _) | (_, Err(e)) => panic!("os_random failed: {e}"),
}
}
#[test]
fn read_iv_random_succeeds() {
assert!(
read_iv_random().is_ok(),
"read_iv_random failed on this platform"
);
}
#[test]
fn fallback_iv_session_salt_unique_across_calls() {
use std::collections::HashSet;
let outputs: HashSet<[u8; 16]> = (0..1000).map(|_| fallback_iv_session_salt()).collect();
assert_eq!(
outputs.len(),
1000,
"collisions detected in fallback_iv_session_salt"
);
}
#[test]
fn read_iv_session_salt_succeeds() {
assert!(
read_iv_session_salt().is_ok(),
"read_iv_session_salt failed on this platform"
);
}
#[test]
fn counter_wrap_rotates_prefix_without_entropy_read() {
let addr = SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::LOCALHOST, 9876, 0, 0));
let key = Key::from_bytes([0u8; 32]);
let mut tx = SecureUdpTransport::connect(addr, key).expect("connect");
let prefix_before = tx.iv_prefix_for_test();
let salt_before = tx.iv_session_salt;
tx.set_iv_counter_for_test(u32::MAX);
let buf = [0u8; 32];
let _ = <SecureUdpTransport as BeatTransport>::send(&mut tx, &buf);
assert_eq!(
tx.iv_session_salt, salt_before,
"salt rotated unexpectedly on wrap"
);
assert_eq!(
tx.iv_prefix_index_for_test(),
1,
"prefix_index should advance to 1 on wrap"
);
assert_ne!(
tx.iv_prefix_for_test(),
prefix_before,
"rotated prefix should differ from prior prefix"
);
assert_eq!(tx.iv_counter, 1, "counter should reset to 1 on wrap");
}
#[test]
fn wrap_path_does_not_call_read_iv_session_salt() {
let addr = SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::LOCALHOST, 9876, 0, 0));
let key = Key::from_bytes([0u8; 32]);
let mut tx = SecureUdpTransport::connect(addr, key).expect("connect");
let salt_snapshot = tx.iv_session_salt;
for expected_index in 1..=4 {
tx.set_iv_counter_for_test(u32::MAX);
let buf = [0u8; 32];
let _ = <SecureUdpTransport as BeatTransport>::send(&mut tx, &buf);
assert_eq!(
tx.iv_session_salt, salt_snapshot,
"salt mutated during wrap rotation (regression)"
);
assert_eq!(tx.iv_prefix_index_for_test(), expected_index);
}
}
#[test]
fn doubly_exhausted_nonce_falls_back_to_reconnect() {
let addr = SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::LOCALHOST, 9876, 0, 0));
let key = Key::from_bytes([0u8; 32]);
let mut tx = SecureUdpTransport::connect(addr, key).expect("connect");
let salt_before = tx.iv_session_salt;
let prefix_before = tx.iv_prefix_for_test();
tx.set_iv_counter_for_test(u32::MAX);
tx.set_iv_prefix_index_for_test(u32::MAX);
let buf = [0u8; 32];
let _ = <SecureUdpTransport as BeatTransport>::send(&mut tx, &buf);
assert_ne!(
tx.iv_session_salt, salt_before,
"reconnect should refresh the session salt on double exhaustion"
);
assert_eq!(tx.iv_prefix_index_for_test(), 0);
assert_eq!(tx.iv_counter, 1);
assert_ne!(tx.iv_prefix_for_test(), prefix_before);
}
#[test]
fn reconnect_success_updates_all_iv_state_and_socket_port() {
let addr = SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::LOCALHOST, 9876, 0, 0));
let key = Key::from_bytes([0x42; 32]);
let mut tx = SecureUdpTransport::connect(addr, key).expect("connect");
tx.set_iv_counter_for_test(123);
tx.set_iv_prefix_index_for_test(7);
let port_before = tx.sock.local_addr().expect("local_addr").port();
let salt_before = tx.iv_session_salt;
let prefix_before = tx.iv_prefix_for_test();
<SecureUdpTransport as BeatTransport>::reconnect(&mut tx)
.expect("reconnect on loopback must succeed");
assert_eq!(tx.iv_counter, 0, "iv_counter must reset to 0");
assert_eq!(
tx.iv_prefix_index_for_test(),
0,
"iv_prefix_index must reset to 0"
);
assert_ne!(
tx.iv_session_salt, salt_before,
"salt must be re-read from OS entropy (1-in-2^128 collision)"
);
assert_ne!(
tx.iv_prefix_for_test(),
prefix_before,
"prefix must be re-derived from the new salt"
);
assert_ne!(
tx.sock.local_addr().expect("local_addr").port(),
port_before,
"ephemeral source port must differ after re-bind"
);
}
#[test]
fn iv_counter_commits_only_on_successful_send() {
use std::mem;
use std::net::{Ipv4Addr, UdpSocket};
let addr = SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::LOCALHOST, 9876, 0, 0));
let key = Key::from_bytes([0u8; 32]);
let mut tx = SecureUdpTransport::connect(addr, key).expect("connect");
let baseline = tx.iv_counter;
let buf = [0u8; 32];
let ok = <SecureUdpTransport as BeatTransport>::send(&mut tx, &buf);
assert!(
ok.is_ok(),
"baseline send on connected socket failed: {ok:?}"
);
assert_eq!(
tx.iv_counter,
baseline + 1,
"successful send must advance iv_counter by exactly 1"
);
let unconnected =
UdpSocket::bind((Ipv4Addr::LOCALHOST, 0)).expect("bind unconnected UDP socket");
unconnected
.set_nonblocking(true)
.expect("set_nonblocking on unconnected");
let _replaced = mem::replace(&mut tx.sock, unconnected);
let counter_before = tx.iv_counter;
let prefix_before = tx.iv_prefix_for_test();
let prefix_index_before = tx.iv_prefix_index_for_test();
for attempt in 0..5 {
let r = <SecureUdpTransport as BeatTransport>::send(&mut tx, &buf);
assert!(
r.is_err(),
"send #{attempt} on unconnected socket unexpectedly succeeded: {r:?}"
);
}
assert_eq!(
tx.iv_counter, counter_before,
"iv_counter advanced despite send() failures \
(commit-on-success contract violated)"
);
assert_eq!(
tx.iv_prefix_for_test(),
prefix_before,
"iv_prefix mutated on failed send"
);
assert_eq!(
tx.iv_prefix_index_for_test(),
prefix_index_before,
"iv_prefix_index mutated on failed send"
);
}
#[test]
fn manual_reconnect_does_re_read_entropy() {
let addr = SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::LOCALHOST, 9876, 0, 0));
let key = Key::from_bytes([0u8; 32]);
let mut tx = SecureUdpTransport::connect(addr, key).expect("connect");
let salt_before = tx.iv_session_salt;
let prefix_before = tx.iv_prefix_for_test();
tx.iv_prefix_index = 42;
tx.iv_counter = 12345;
<SecureUdpTransport as BeatTransport>::reconnect(&mut tx).expect("reconnect");
assert_eq!(tx.iv_prefix_index_for_test(), 0);
assert_eq!(tx.iv_counter, 0);
assert_ne!(
tx.iv_session_salt, salt_before,
"reconnect should refresh the session salt"
);
assert_ne!(tx.iv_prefix_for_test(), prefix_before);
}
}