dynomite/entropy/mod.rs
1//! Entropy reconciliation channel.
2//!
3//! The entropy module ships a snapshot of the local datastore to a
4//! peer reconciliation engine (and vice versa) over a TCP
5//! connection. Each chunk of the snapshot is encrypted with
6//! AES-128-CBC using a pre-shared key and IV loaded from the
7//! conf-configured `recon_key.pem` and `recon_iv.pem` files. The
8//! key and IV used here are independent from the per-peer
9//! AES session key established during the DNODE handshake; the
10//! entropy channel uses its own pre-shared material.
11//!
12//! # Layout
13//!
14//! * [`util`] holds the on-disk key/IV loaders and validated
15//! wrappers.
16//! * [`send`] drives the client side: connect, write the
17//! negotiation header, stream the snapshot.
18//! * [`receive`] drives the server side: bind, accept, replay the
19//! plaintext into a [`SnapshotSink`].
20//! * The top-level [`EntropyConfig`] gathers the operator-visible
21//! knobs.
22//!
23//! # Wire format
24//!
25//! The reference engine's reconciliation protocol consists of two
26//! sub-protocols (a file-stream `snd` path and a per-record `rcv`
27//! path) that are not inverses of each other. The Rust port uses a
28//! single, length-prefixed framing that subsumes the `snd` shape
29//! and is symmetric across sender and receiver. See
30//! [`docs/parity.md`](../../../docs/parity.md) for the precise
31//! divergence list.
32//!
33//! ```text
34//! Negotiation header (20 bytes, big-endian u32 fields):
35//! magic = 0x64640001
36//! command = 1 (SEND: data flowing sender -> receiver)
37//! header_size = N (size in bytes of the snapshot header below)
38//! buffer_size = M (max plaintext bytes per chunk)
39//! cipher_size = C (max ciphertext bytes per chunk)
40//!
41//! Snapshot header (header_size bytes, big-endian u32 fields then
42//! zero-padded):
43//! total_len = L (total plaintext length in bytes)
44//! encrypt_flag = 0|1 (1 if chunks are AES-128-CBC encrypted)
45//! <header_size - 8 bytes of zero padding>
46//!
47//! Chunks (repeated until L plaintext bytes have been delivered):
48//! u32 BE chunk_len
49//! <chunk_len bytes of payload>
50//!
51//! When encrypt_flag is set each payload is the AES-128-CBC
52//! ciphertext of the matching plaintext chunk, PKCS#7-padded to a
53//! 16-byte boundary; the receiver strips the padding before
54//! delivering bytes to its sink.
55//! ```
56//!
57//! # Example
58//!
59//! ```no_run
60//! use std::path::PathBuf;
61//! use dynomite::entropy::{EntropyConfig, EntropyReceiver, EntropySender};
62//!
63//! let cfg = EntropyConfig {
64//! key_file: PathBuf::from("/etc/dynomite/recon_key.pem"),
65//! iv_file: PathBuf::from("/etc/dynomite/recon_iv.pem"),
66//! listen_addr: "127.0.0.1:8105".parse().unwrap(),
67//! send_addr: None,
68//! peer_endpoint: "127.0.0.1:8105".parse().unwrap(),
69//! buffer_size: 16 * 1024,
70//! header_size: 1024,
71//! encrypt: true,
72//! };
73//! drop(cfg);
74//! ```
75
76pub mod driver;
77pub mod receive;
78pub mod send;
79pub mod util;
80
81use std::io;
82use std::net::SocketAddr;
83use std::path::PathBuf;
84use std::sync::Arc;
85
86use thiserror::Error;
87
88pub use crate::entropy::receive::EntropyReceiver;
89pub use crate::entropy::send::{EntropySender, RedisLocalSnapshot};
90pub use crate::entropy::util::{
91 EntropyIv, EntropyKey, EntropyMaterial, ENTROPY_IV_LEN, ENTROPY_KEY_LEN,
92};
93
94/// Box a [`SnapshotSource`] into the [`BoxedSnapshotSource`] alias
95/// expected by [`EntropySender::run`].
96///
97/// # Examples
98///
99/// ```
100/// use dynomite::entropy::{boxed_source, send::StaticSnapshot, SnapshotSource};
101/// let s = boxed_source(StaticSnapshot::new(vec![1, 2, 3]));
102/// assert_eq!(s.snapshot().unwrap(), vec![1, 2, 3]);
103/// ```
104#[must_use]
105pub fn boxed_source<S: SnapshotSource + 'static>(source: S) -> BoxedSnapshotSource {
106 Arc::new(source)
107}
108
109/// Box a [`SnapshotSink`] into the [`BoxedSnapshotSink`] alias
110/// expected by [`EntropyReceiver::run`].
111///
112/// # Examples
113///
114/// ```
115/// use dynomite::entropy::{boxed_sink, receive::MemorySink, SnapshotSink};
116/// let sink = boxed_sink(MemorySink::default());
117/// sink.apply(b"hi").unwrap();
118/// ```
119#[must_use]
120pub fn boxed_sink<S: SnapshotSink + 'static>(sink: S) -> BoxedSnapshotSink {
121 Arc::new(sink)
122}
123
124/// Magic word that opens every entropy negotiation header.
125///
126/// Mirrors `MAGIC_NUMBER` (`64640001`) defined inline in the
127/// reference engine's entropy utility.
128pub const ENTROPY_MAGIC: u32 = 0x6464_0001;
129
130/// Negotiation command: sender pushes a snapshot to the receiver.
131pub const ENTROPY_COMMAND_SEND: u32 = 1;
132
133/// Default plaintext chunk size, in bytes. Matches the reference
134/// engine's `BUFFER_SIZE` (16 KiB).
135pub const DEFAULT_BUFFER_SIZE: usize = 16 * 1024;
136
137/// Default ciphertext chunk capacity, in bytes. The reference
138/// engine reserves an extra 1 KiB above the plaintext buffer to
139/// hold cipher overhead; we mirror that headroom, which is far
140/// more than PKCS#7 needs.
141pub const DEFAULT_CIPHER_SIZE: usize = DEFAULT_BUFFER_SIZE + 1024;
142
143/// Default snapshot header size, in bytes. Matches the reference
144/// engine's `MAX_HEADER_SIZE`.
145pub const DEFAULT_HEADER_SIZE: usize = 1024;
146
147/// Hard ceiling on the negotiated `header_size`, in bytes.
148///
149/// Mirrors the reference engine's `MAX_HEADER_SIZE` validation.
150pub const MAX_HEADER_SIZE: usize = 1024;
151
152/// Hard ceiling on the negotiated `buffer_size`, in bytes (5 MiB).
153///
154/// Mirrors the reference engine's `MAX_BUFFER_SIZE` validation.
155pub const MAX_BUFFER_SIZE: usize = 5 * 1024 * 1024;
156
157/// Hard ceiling on the negotiated `cipher_size`, in bytes (5 MiB).
158///
159/// Mirrors the reference engine's `MAX_CIPHER_SIZE` validation.
160pub const MAX_CIPHER_SIZE: usize = 5 * 1024 * 1024;
161
162/// Hard ceiling on a single snapshot's plaintext size, in bytes (4 GiB).
163///
164/// The reference engine does not bound this explicitly because its
165/// receiver consumes per-key chunks and never allocates the whole
166/// snapshot in one call. The Rust port stages every chunk into a
167/// `Vec<u8>` for replay and so MUST cap the upfront allocation. A
168/// malicious sender that completes the negotiation handshake could
169/// otherwise declare `total_len = u32::MAX` and trigger a 4 GiB
170/// allocation attempt before any payload bytes arrive. The cap below
171/// is a generous practical ceiling (4 GiB minus 1) that still lets
172/// large RDB snapshots through; embedders can plug their own
173/// `SnapshotSink` to apply tighter bounds.
174pub const MAX_SNAPSHOT_SIZE: usize = u32::MAX as usize - 1;
175
176/// Cap the receiver's pre-allocation hint to avoid a malicious or
177/// malformed `total_len` triggering an oversized allocation up
178/// front. Real RDB snapshots stream chunk-by-chunk; the receiver
179/// reallocates as plaintext arrives if the snapshot is genuinely
180/// larger than the hint.
181pub const SAFE_PREALLOC: usize = 16 * 1024 * 1024;
182
183/// Operator-facing configuration for the entropy worker.
184///
185/// # Examples
186///
187/// ```
188/// use std::path::PathBuf;
189/// use dynomite::entropy::EntropyConfig;
190/// let cfg = EntropyConfig {
191/// key_file: PathBuf::from("conf/recon_key.pem"),
192/// iv_file: PathBuf::from("conf/recon_iv.pem"),
193/// listen_addr: "127.0.0.1:8105".parse().unwrap(),
194/// send_addr: None,
195/// peer_endpoint: "127.0.0.1:8105".parse().unwrap(),
196/// buffer_size: 16 * 1024,
197/// header_size: 1024,
198/// encrypt: true,
199/// };
200/// assert_eq!(cfg.buffer_size, 16 * 1024);
201/// ```
202#[derive(Debug, Clone)]
203pub struct EntropyConfig {
204 /// On-disk path to the AES-128 key file.
205 pub key_file: PathBuf,
206 /// On-disk path to the AES-128-CBC IV file.
207 pub iv_file: PathBuf,
208 /// Address the [`EntropyReceiver`] binds to.
209 pub listen_addr: SocketAddr,
210 /// Optional local bind address for the [`EntropySender`]. When
211 /// `None` the kernel assigns an ephemeral port on the wildcard
212 /// address.
213 pub send_addr: Option<SocketAddr>,
214 /// Address the [`EntropySender`] dials.
215 pub peer_endpoint: SocketAddr,
216 /// Plaintext chunk size (bytes). Must be a multiple of 16 when
217 /// encryption is enabled and must not exceed
218 /// [`MAX_BUFFER_SIZE`].
219 pub buffer_size: usize,
220 /// Snapshot header size (bytes). Must not exceed
221 /// [`MAX_HEADER_SIZE`].
222 pub header_size: usize,
223 /// Whether per-chunk payloads are AES-128-CBC encrypted.
224 pub encrypt: bool,
225}
226
227impl EntropyConfig {
228 /// Validate the cross-field invariants the protocol demands.
229 ///
230 /// # Errors
231 /// [`EntropyError::Config`] when any invariant is violated.
232 ///
233 /// # Examples
234 ///
235 /// ```
236 /// use std::path::PathBuf;
237 /// use dynomite::entropy::EntropyConfig;
238 /// let mut cfg = EntropyConfig {
239 /// key_file: PathBuf::from("k"),
240 /// iv_file: PathBuf::from("v"),
241 /// listen_addr: "127.0.0.1:0".parse().unwrap(),
242 /// send_addr: None,
243 /// peer_endpoint: "127.0.0.1:0".parse().unwrap(),
244 /// buffer_size: 32,
245 /// header_size: 64,
246 /// encrypt: true,
247 /// };
248 /// assert!(cfg.validate().is_ok());
249 /// cfg.buffer_size = 0;
250 /// assert!(cfg.validate().is_err());
251 /// ```
252 pub fn validate(&self) -> Result<(), EntropyError> {
253 if self.buffer_size == 0 || self.buffer_size > MAX_BUFFER_SIZE {
254 return Err(EntropyError::Config(format!(
255 "buffer_size {} out of range (1..={MAX_BUFFER_SIZE})",
256 self.buffer_size
257 )));
258 }
259 if self.header_size < 8 || self.header_size > MAX_HEADER_SIZE {
260 return Err(EntropyError::Config(format!(
261 "header_size {} out of range (8..={MAX_HEADER_SIZE})",
262 self.header_size
263 )));
264 }
265 if self.encrypt && !self.buffer_size.is_multiple_of(16) {
266 return Err(EntropyError::Config(format!(
267 "buffer_size {} must be a multiple of 16 with encryption enabled",
268 self.buffer_size
269 )));
270 }
271 Ok(())
272 }
273}
274
275/// Snapshot byte source.
276///
277/// Implementations are typically pluggable via the Stage 13
278/// embedding API; the engine ships [`RedisLocalSnapshot`] as the
279/// default. Implementations are expected to be cheap to clone
280/// (e.g. shared via [`Arc`]) but each call to [`snapshot`] may
281/// produce a different blob.
282///
283/// [`snapshot`]: SnapshotSource::snapshot
284pub trait SnapshotSource: Send + Sync {
285 /// Produce one snapshot of the local state as a contiguous
286 /// byte buffer. The sender treats the bytes as opaque; the
287 /// receiver replays them through its [`SnapshotSink`].
288 ///
289 /// # Errors
290 /// Implementation-defined.
291 fn snapshot(&self) -> Result<Vec<u8>, EntropyError>;
292}
293
294/// Boxed [`SnapshotSource`] handed to [`EntropySender`].
295pub type BoxedSnapshotSource = Arc<dyn SnapshotSource>;
296
297impl<T> SnapshotSource for Arc<T>
298where
299 T: SnapshotSource + ?Sized,
300{
301 fn snapshot(&self) -> Result<Vec<u8>, EntropyError> {
302 (**self).snapshot()
303 }
304}
305
306/// Receiver-side hook that consumes the decrypted snapshot.
307///
308/// Implementations are typically pluggable via the Stage 13
309/// embedding API. The engine ships an in-memory implementation
310/// for tests and the [`receive::RedisReplaySink`] default for
311/// production wiring.
312pub trait SnapshotSink: Send + Sync {
313 /// Apply the full plaintext snapshot to the local datastore.
314 /// Called exactly once per connection.
315 ///
316 /// # Errors
317 /// Implementation-defined.
318 fn apply(&self, snapshot: &[u8]) -> Result<(), EntropyError>;
319}
320
321/// Boxed [`SnapshotSink`] handed to [`EntropyReceiver`].
322pub type BoxedSnapshotSink = Arc<dyn SnapshotSink>;
323
324impl<T> SnapshotSink for Arc<T>
325where
326 T: SnapshotSink + ?Sized,
327{
328 fn apply(&self, snapshot: &[u8]) -> Result<(), EntropyError> {
329 (**self).apply(snapshot)
330 }
331}
332
333/// Errors raised by the entropy module.
334#[derive(Debug, Error)]
335pub enum EntropyError {
336 /// I/O or socket failure.
337 #[error("entropy io: {0}")]
338 Io(#[from] io::Error),
339 /// Invalid configuration.
340 #[error("entropy config: {0}")]
341 Config(String),
342 /// Key or IV material is missing or malformed.
343 #[error("entropy key material: {0}")]
344 KeyMaterial(String),
345 /// Wire-protocol violation.
346 #[error("entropy protocol: {0}")]
347 Protocol(String),
348 /// AES-CBC decryption failure (bad padding or wrong key).
349 #[error("entropy crypto: {0}")]
350 Crypto(String),
351 /// Snapshot source produced an error.
352 #[error("entropy source: {0}")]
353 Source(String),
354 /// Snapshot sink rejected the replay.
355 #[error("entropy sink: {0}")]
356 Sink(String),
357}
358
359/// Convenience type alias.
360pub type EntropyResult<T> = Result<T, EntropyError>;
361
362/// Negotiation header that opens every entropy connection.
363///
364/// All five fields are 32-bit big-endian unsigned integers on the
365/// wire.
366#[derive(Debug, Clone, Copy, Eq, PartialEq)]
367pub struct NegotiationHeader {
368 /// Magic word; must equal [`ENTROPY_MAGIC`].
369 pub magic: u32,
370 /// Direction marker. [`ENTROPY_COMMAND_SEND`] = data flows
371 /// from sender to receiver.
372 pub command: u32,
373 /// Snapshot-header size, in bytes.
374 pub header_size: u32,
375 /// Plaintext chunk size, in bytes.
376 pub buffer_size: u32,
377 /// Ciphertext chunk capacity, in bytes.
378 pub cipher_size: u32,
379}
380
381impl NegotiationHeader {
382 /// Wire size in bytes.
383 pub const SIZE: usize = 5 * 4;
384
385 /// Encode the header to wire bytes.
386 #[must_use]
387 pub fn to_wire(self) -> [u8; Self::SIZE] {
388 let mut out = [0u8; Self::SIZE];
389 out[0..4].copy_from_slice(&self.magic.to_be_bytes());
390 out[4..8].copy_from_slice(&self.command.to_be_bytes());
391 out[8..12].copy_from_slice(&self.header_size.to_be_bytes());
392 out[12..16].copy_from_slice(&self.buffer_size.to_be_bytes());
393 out[16..20].copy_from_slice(&self.cipher_size.to_be_bytes());
394 out
395 }
396
397 /// Decode and validate a wire-format negotiation header.
398 ///
399 /// # Errors
400 /// [`EntropyError::Protocol`] if any field is out of range or
401 /// the magic word is wrong.
402 pub fn from_wire(bytes: &[u8; Self::SIZE]) -> Result<Self, EntropyError> {
403 let magic = u32::from_be_bytes(bytes[0..4].try_into().unwrap());
404 let command = u32::from_be_bytes(bytes[4..8].try_into().unwrap());
405 let header_size = u32::from_be_bytes(bytes[8..12].try_into().unwrap());
406 let buffer_size = u32::from_be_bytes(bytes[12..16].try_into().unwrap());
407 let cipher_size = u32::from_be_bytes(bytes[16..20].try_into().unwrap());
408 if magic != ENTROPY_MAGIC {
409 return Err(EntropyError::Protocol(format!(
410 "bad magic 0x{magic:08x}, expected 0x{ENTROPY_MAGIC:08x}"
411 )));
412 }
413 if command != ENTROPY_COMMAND_SEND {
414 return Err(EntropyError::Protocol(format!(
415 "unsupported command {command}"
416 )));
417 }
418 if header_size < 8 || header_size as usize > MAX_HEADER_SIZE {
419 return Err(EntropyError::Protocol(format!(
420 "header_size {header_size} out of range"
421 )));
422 }
423 if buffer_size == 0 || buffer_size as usize > MAX_BUFFER_SIZE {
424 return Err(EntropyError::Protocol(format!(
425 "buffer_size {buffer_size} out of range"
426 )));
427 }
428 if cipher_size == 0 || cipher_size as usize > MAX_CIPHER_SIZE {
429 return Err(EntropyError::Protocol(format!(
430 "cipher_size {cipher_size} out of range"
431 )));
432 }
433 Ok(Self {
434 magic,
435 command,
436 header_size,
437 buffer_size,
438 cipher_size,
439 })
440 }
441}
442
443/// Per-snapshot header carried inside the variable-sized header
444/// region declared by the negotiation step.
445///
446/// The first eight bytes are two big-endian `u32`s: the total
447/// plaintext length and the encrypt flag. The remaining bytes are
448/// reserved-for-future-use and must be transmitted as zero.
449#[derive(Debug, Clone, Copy, Eq, PartialEq)]
450pub struct SnapshotHeader {
451 /// Total plaintext length, in bytes.
452 pub total_len: u32,
453 /// `1` when chunks are AES-128-CBC encrypted, `0` otherwise.
454 pub encrypt_flag: u32,
455}
456
457impl SnapshotHeader {
458 /// Encode into a `header_size`-byte buffer.
459 ///
460 /// # Errors
461 /// [`EntropyError::Config`] if `header_size` is shorter than
462 /// the eight-byte fixed prefix.
463 pub fn to_wire(self, header_size: usize) -> Result<Vec<u8>, EntropyError> {
464 if header_size < 8 {
465 return Err(EntropyError::Config(format!(
466 "header_size {header_size} smaller than fixed 8-byte prefix"
467 )));
468 }
469 let mut out = vec![0u8; header_size];
470 out[0..4].copy_from_slice(&self.total_len.to_be_bytes());
471 out[4..8].copy_from_slice(&self.encrypt_flag.to_be_bytes());
472 Ok(out)
473 }
474
475 /// Decode from a `header_size`-byte buffer.
476 ///
477 /// # Errors
478 /// [`EntropyError::Protocol`] if the buffer is shorter than
479 /// the fixed prefix or carries an unknown `encrypt_flag`.
480 pub fn from_wire(bytes: &[u8]) -> Result<Self, EntropyError> {
481 if bytes.len() < 8 {
482 return Err(EntropyError::Protocol(format!(
483 "snapshot header too short ({} bytes)",
484 bytes.len()
485 )));
486 }
487 let total_len = u32::from_be_bytes(bytes[0..4].try_into().unwrap());
488 let encrypt_flag = u32::from_be_bytes(bytes[4..8].try_into().unwrap());
489 if encrypt_flag > 1 {
490 return Err(EntropyError::Protocol(format!(
491 "unknown encrypt_flag {encrypt_flag}"
492 )));
493 }
494 Ok(Self {
495 total_len,
496 encrypt_flag,
497 })
498 }
499}
500
501#[cfg(test)]
502mod tests {
503 use super::*;
504
505 #[test]
506 fn negotiation_header_round_trips() {
507 let hdr = NegotiationHeader {
508 magic: ENTROPY_MAGIC,
509 command: ENTROPY_COMMAND_SEND,
510 header_size: 1024,
511 buffer_size: 16 * 1024,
512 cipher_size: 17 * 1024,
513 };
514 let wire = hdr.to_wire();
515 let parsed = NegotiationHeader::from_wire(&wire).unwrap();
516 assert_eq!(parsed, hdr);
517 }
518
519 #[test]
520 fn negotiation_header_rejects_bad_magic() {
521 let mut wire = NegotiationHeader {
522 magic: 0xdead_beef,
523 command: ENTROPY_COMMAND_SEND,
524 header_size: 1024,
525 buffer_size: 16 * 1024,
526 cipher_size: 17 * 1024,
527 }
528 .to_wire();
529 // Force-write the bad magic in case the constructor ever changes.
530 wire[0..4].copy_from_slice(&0xdead_beefu32.to_be_bytes());
531 let err = NegotiationHeader::from_wire(&wire).unwrap_err();
532 assert!(matches!(err, EntropyError::Protocol(_)));
533 }
534
535 #[test]
536 fn snapshot_header_round_trips() {
537 let hdr = SnapshotHeader {
538 total_len: 4096,
539 encrypt_flag: 1,
540 };
541 let wire = hdr.to_wire(64).unwrap();
542 assert_eq!(wire.len(), 64);
543 for byte in &wire[8..] {
544 assert_eq!(*byte, 0);
545 }
546 let parsed = SnapshotHeader::from_wire(&wire).unwrap();
547 assert_eq!(parsed, hdr);
548 }
549
550 #[test]
551 fn snapshot_header_rejects_bad_flag() {
552 let mut wire = vec![0u8; 16];
553 wire[4..8].copy_from_slice(&5u32.to_be_bytes());
554 let err = SnapshotHeader::from_wire(&wire).unwrap_err();
555 assert!(matches!(err, EntropyError::Protocol(_)));
556 }
557
558 #[test]
559 fn config_validate_rejects_zero_buffer() {
560 let cfg = EntropyConfig {
561 key_file: PathBuf::from("k"),
562 iv_file: PathBuf::from("v"),
563 listen_addr: "127.0.0.1:0".parse().unwrap(),
564 send_addr: None,
565 peer_endpoint: "127.0.0.1:0".parse().unwrap(),
566 buffer_size: 0,
567 header_size: 64,
568 encrypt: true,
569 };
570 assert!(cfg.validate().is_err());
571 }
572}