pub mod driver;
pub mod receive;
pub mod send;
pub mod util;
use std::io;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;
use thiserror::Error;
pub use crate::entropy::receive::EntropyReceiver;
pub use crate::entropy::send::{EntropySender, RedisLocalSnapshot};
pub use crate::entropy::util::{
EntropyIv, EntropyKey, EntropyMaterial, ENTROPY_IV_LEN, ENTROPY_KEY_LEN,
};
#[must_use]
pub fn boxed_source<S: SnapshotSource + 'static>(source: S) -> BoxedSnapshotSource {
Arc::new(source)
}
#[must_use]
pub fn boxed_sink<S: SnapshotSink + 'static>(sink: S) -> BoxedSnapshotSink {
Arc::new(sink)
}
pub const ENTROPY_MAGIC: u32 = 0x6464_0001;
pub const ENTROPY_COMMAND_SEND: u32 = 1;
pub const DEFAULT_BUFFER_SIZE: usize = 16 * 1024;
pub const DEFAULT_CIPHER_SIZE: usize = DEFAULT_BUFFER_SIZE + 1024;
pub const DEFAULT_HEADER_SIZE: usize = 1024;
pub const MAX_HEADER_SIZE: usize = 1024;
pub const MAX_BUFFER_SIZE: usize = 5 * 1024 * 1024;
pub const MAX_CIPHER_SIZE: usize = 5 * 1024 * 1024;
pub const MAX_SNAPSHOT_SIZE: usize = u32::MAX as usize - 1;
pub const SAFE_PREALLOC: usize = 16 * 1024 * 1024;
#[derive(Debug, Clone)]
pub struct EntropyConfig {
pub key_file: PathBuf,
pub iv_file: PathBuf,
pub listen_addr: SocketAddr,
pub send_addr: Option<SocketAddr>,
pub peer_endpoint: SocketAddr,
pub buffer_size: usize,
pub header_size: usize,
pub encrypt: bool,
}
impl EntropyConfig {
pub fn validate(&self) -> Result<(), EntropyError> {
if self.buffer_size == 0 || self.buffer_size > MAX_BUFFER_SIZE {
return Err(EntropyError::Config(format!(
"buffer_size {} out of range (1..={MAX_BUFFER_SIZE})",
self.buffer_size
)));
}
if self.header_size < 8 || self.header_size > MAX_HEADER_SIZE {
return Err(EntropyError::Config(format!(
"header_size {} out of range (8..={MAX_HEADER_SIZE})",
self.header_size
)));
}
if self.encrypt && !self.buffer_size.is_multiple_of(16) {
return Err(EntropyError::Config(format!(
"buffer_size {} must be a multiple of 16 with encryption enabled",
self.buffer_size
)));
}
Ok(())
}
}
pub trait SnapshotSource: Send + Sync {
fn snapshot(&self) -> Result<Vec<u8>, EntropyError>;
}
pub type BoxedSnapshotSource = Arc<dyn SnapshotSource>;
impl<T> SnapshotSource for Arc<T>
where
T: SnapshotSource + ?Sized,
{
fn snapshot(&self) -> Result<Vec<u8>, EntropyError> {
(**self).snapshot()
}
}
pub trait SnapshotSink: Send + Sync {
fn apply(&self, snapshot: &[u8]) -> Result<(), EntropyError>;
}
pub type BoxedSnapshotSink = Arc<dyn SnapshotSink>;
impl<T> SnapshotSink for Arc<T>
where
T: SnapshotSink + ?Sized,
{
fn apply(&self, snapshot: &[u8]) -> Result<(), EntropyError> {
(**self).apply(snapshot)
}
}
#[derive(Debug, Error)]
pub enum EntropyError {
#[error("entropy io: {0}")]
Io(#[from] io::Error),
#[error("entropy config: {0}")]
Config(String),
#[error("entropy key material: {0}")]
KeyMaterial(String),
#[error("entropy protocol: {0}")]
Protocol(String),
#[error("entropy crypto: {0}")]
Crypto(String),
#[error("entropy source: {0}")]
Source(String),
#[error("entropy sink: {0}")]
Sink(String),
}
pub type EntropyResult<T> = Result<T, EntropyError>;
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub struct NegotiationHeader {
pub magic: u32,
pub command: u32,
pub header_size: u32,
pub buffer_size: u32,
pub cipher_size: u32,
}
impl NegotiationHeader {
pub const SIZE: usize = 5 * 4;
#[must_use]
pub fn to_wire(self) -> [u8; Self::SIZE] {
let mut out = [0u8; Self::SIZE];
out[0..4].copy_from_slice(&self.magic.to_be_bytes());
out[4..8].copy_from_slice(&self.command.to_be_bytes());
out[8..12].copy_from_slice(&self.header_size.to_be_bytes());
out[12..16].copy_from_slice(&self.buffer_size.to_be_bytes());
out[16..20].copy_from_slice(&self.cipher_size.to_be_bytes());
out
}
pub fn from_wire(bytes: &[u8; Self::SIZE]) -> Result<Self, EntropyError> {
let magic = u32::from_be_bytes(bytes[0..4].try_into().unwrap());
let command = u32::from_be_bytes(bytes[4..8].try_into().unwrap());
let header_size = u32::from_be_bytes(bytes[8..12].try_into().unwrap());
let buffer_size = u32::from_be_bytes(bytes[12..16].try_into().unwrap());
let cipher_size = u32::from_be_bytes(bytes[16..20].try_into().unwrap());
if magic != ENTROPY_MAGIC {
return Err(EntropyError::Protocol(format!(
"bad magic 0x{magic:08x}, expected 0x{ENTROPY_MAGIC:08x}"
)));
}
if command != ENTROPY_COMMAND_SEND {
return Err(EntropyError::Protocol(format!(
"unsupported command {command}"
)));
}
if header_size < 8 || header_size as usize > MAX_HEADER_SIZE {
return Err(EntropyError::Protocol(format!(
"header_size {header_size} out of range"
)));
}
if buffer_size == 0 || buffer_size as usize > MAX_BUFFER_SIZE {
return Err(EntropyError::Protocol(format!(
"buffer_size {buffer_size} out of range"
)));
}
if cipher_size == 0 || cipher_size as usize > MAX_CIPHER_SIZE {
return Err(EntropyError::Protocol(format!(
"cipher_size {cipher_size} out of range"
)));
}
Ok(Self {
magic,
command,
header_size,
buffer_size,
cipher_size,
})
}
}
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub struct SnapshotHeader {
pub total_len: u32,
pub encrypt_flag: u32,
}
impl SnapshotHeader {
pub fn to_wire(self, header_size: usize) -> Result<Vec<u8>, EntropyError> {
if header_size < 8 {
return Err(EntropyError::Config(format!(
"header_size {header_size} smaller than fixed 8-byte prefix"
)));
}
let mut out = vec![0u8; header_size];
out[0..4].copy_from_slice(&self.total_len.to_be_bytes());
out[4..8].copy_from_slice(&self.encrypt_flag.to_be_bytes());
Ok(out)
}
pub fn from_wire(bytes: &[u8]) -> Result<Self, EntropyError> {
if bytes.len() < 8 {
return Err(EntropyError::Protocol(format!(
"snapshot header too short ({} bytes)",
bytes.len()
)));
}
let total_len = u32::from_be_bytes(bytes[0..4].try_into().unwrap());
let encrypt_flag = u32::from_be_bytes(bytes[4..8].try_into().unwrap());
if encrypt_flag > 1 {
return Err(EntropyError::Protocol(format!(
"unknown encrypt_flag {encrypt_flag}"
)));
}
Ok(Self {
total_len,
encrypt_flag,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn negotiation_header_round_trips() {
let hdr = NegotiationHeader {
magic: ENTROPY_MAGIC,
command: ENTROPY_COMMAND_SEND,
header_size: 1024,
buffer_size: 16 * 1024,
cipher_size: 17 * 1024,
};
let wire = hdr.to_wire();
let parsed = NegotiationHeader::from_wire(&wire).unwrap();
assert_eq!(parsed, hdr);
}
#[test]
fn negotiation_header_rejects_bad_magic() {
let mut wire = NegotiationHeader {
magic: 0xdead_beef,
command: ENTROPY_COMMAND_SEND,
header_size: 1024,
buffer_size: 16 * 1024,
cipher_size: 17 * 1024,
}
.to_wire();
wire[0..4].copy_from_slice(&0xdead_beefu32.to_be_bytes());
let err = NegotiationHeader::from_wire(&wire).unwrap_err();
assert!(matches!(err, EntropyError::Protocol(_)));
}
#[test]
fn snapshot_header_round_trips() {
let hdr = SnapshotHeader {
total_len: 4096,
encrypt_flag: 1,
};
let wire = hdr.to_wire(64).unwrap();
assert_eq!(wire.len(), 64);
for byte in &wire[8..] {
assert_eq!(*byte, 0);
}
let parsed = SnapshotHeader::from_wire(&wire).unwrap();
assert_eq!(parsed, hdr);
}
#[test]
fn snapshot_header_rejects_bad_flag() {
let mut wire = vec![0u8; 16];
wire[4..8].copy_from_slice(&5u32.to_be_bytes());
let err = SnapshotHeader::from_wire(&wire).unwrap_err();
assert!(matches!(err, EntropyError::Protocol(_)));
}
#[test]
fn config_validate_rejects_zero_buffer() {
let cfg = EntropyConfig {
key_file: PathBuf::from("k"),
iv_file: PathBuf::from("v"),
listen_addr: "127.0.0.1:0".parse().unwrap(),
send_addr: None,
peer_endpoint: "127.0.0.1:0".parse().unwrap(),
buffer_size: 0,
header_size: 64,
encrypt: true,
};
assert!(cfg.validate().is_err());
}
}