Skip to main content

dynomite/entropy/
mod.rs

1//! Entropy reconciliation channel.
2//!
3//! The entropy module ships a snapshot of the local datastore to a
4//! peer reconciliation engine (and vice versa) over a TCP
5//! connection. Each chunk of the snapshot is encrypted with
6//! AES-128-CBC using a pre-shared key and IV loaded from the
7//! conf-configured `recon_key.pem` and `recon_iv.pem` files. The
8//! key and IV used here are independent from the per-peer
9//! AES session key established during the DNODE handshake; the
10//! entropy channel uses its own pre-shared material.
11//!
12//! # Layout
13//!
14//! * [`util`] holds the on-disk key/IV loaders and validated
15//!   wrappers.
16//! * [`send`] drives the client side: connect, write the
17//!   negotiation header, stream the snapshot.
18//! * [`receive`] drives the server side: bind, accept, replay the
19//!   plaintext into a [`SnapshotSink`].
20//! * The top-level [`EntropyConfig`] gathers the operator-visible
21//!   knobs.
22//!
23//! # Wire format
24//!
25//! The reference engine's reconciliation protocol consists of two
26//! sub-protocols (a file-stream `snd` path and a per-record `rcv`
27//! path) that are not inverses of each other. The Rust port uses a
28//! single, length-prefixed framing that subsumes the `snd` shape
29//! and is symmetric across sender and receiver. See
30//! [`docs/parity.md`](../../../docs/parity.md) for the precise
31//! divergence list.
32//!
33//! ```text
34//! Negotiation header (20 bytes, big-endian u32 fields):
35//!     magic        = 0x64640001
36//!     command      = 1   (SEND: data flowing sender -> receiver)
37//!     header_size  = N   (size in bytes of the snapshot header below)
38//!     buffer_size  = M   (max plaintext bytes per chunk)
39//!     cipher_size  = C   (max ciphertext bytes per chunk)
40//!
41//! Snapshot header (header_size bytes, big-endian u32 fields then
42//! zero-padded):
43//!     total_len    = L   (total plaintext length in bytes)
44//!     encrypt_flag = 0|1 (1 if chunks are AES-128-CBC encrypted)
45//!     <header_size - 8 bytes of zero padding>
46//!
47//! Chunks (repeated until L plaintext bytes have been delivered):
48//!     u32 BE chunk_len
49//!     <chunk_len bytes of payload>
50//!
51//! When encrypt_flag is set each payload is the AES-128-CBC
52//! ciphertext of the matching plaintext chunk, PKCS#7-padded to a
53//! 16-byte boundary; the receiver strips the padding before
54//! delivering bytes to its sink.
55//! ```
56//!
57//! # Example
58//!
59//! ```no_run
60//! use std::path::PathBuf;
61//! use dynomite::entropy::{EntropyConfig, EntropyReceiver, EntropySender};
62//!
63//! let cfg = EntropyConfig {
64//!     key_file: PathBuf::from("/etc/dynomite/recon_key.pem"),
65//!     iv_file: PathBuf::from("/etc/dynomite/recon_iv.pem"),
66//!     listen_addr: "127.0.0.1:8105".parse().unwrap(),
67//!     send_addr: None,
68//!     peer_endpoint: "127.0.0.1:8105".parse().unwrap(),
69//!     buffer_size: 16 * 1024,
70//!     header_size: 1024,
71//!     encrypt: true,
72//! };
73//! drop(cfg);
74//! ```
75
76pub mod driver;
77pub mod receive;
78pub mod send;
79pub mod util;
80
81use std::io;
82use std::net::SocketAddr;
83use std::path::PathBuf;
84use std::sync::Arc;
85
86use thiserror::Error;
87
88pub use crate::entropy::receive::EntropyReceiver;
89pub use crate::entropy::send::{EntropySender, RedisLocalSnapshot};
90pub use crate::entropy::util::{
91    EntropyIv, EntropyKey, EntropyMaterial, ENTROPY_IV_LEN, ENTROPY_KEY_LEN,
92};
93
94/// Box a [`SnapshotSource`] into the [`BoxedSnapshotSource`] alias
95/// expected by [`EntropySender::run`].
96///
97/// # Examples
98///
99/// ```
100/// use dynomite::entropy::{boxed_source, send::StaticSnapshot, SnapshotSource};
101/// let s = boxed_source(StaticSnapshot::new(vec![1, 2, 3]));
102/// assert_eq!(s.snapshot().unwrap(), vec![1, 2, 3]);
103/// ```
104#[must_use]
105pub fn boxed_source<S: SnapshotSource + 'static>(source: S) -> BoxedSnapshotSource {
106    Arc::new(source)
107}
108
109/// Box a [`SnapshotSink`] into the [`BoxedSnapshotSink`] alias
110/// expected by [`EntropyReceiver::run`].
111///
112/// # Examples
113///
114/// ```
115/// use dynomite::entropy::{boxed_sink, receive::MemorySink, SnapshotSink};
116/// let sink = boxed_sink(MemorySink::default());
117/// sink.apply(b"hi").unwrap();
118/// ```
119#[must_use]
120pub fn boxed_sink<S: SnapshotSink + 'static>(sink: S) -> BoxedSnapshotSink {
121    Arc::new(sink)
122}
123
124/// Magic word that opens every entropy negotiation header.
125///
126/// Mirrors `MAGIC_NUMBER` (`64640001`) defined inline in the
127/// reference engine's entropy utility.
128pub const ENTROPY_MAGIC: u32 = 0x6464_0001;
129
130/// Negotiation command: sender pushes a snapshot to the receiver.
131pub const ENTROPY_COMMAND_SEND: u32 = 1;
132
133/// Default plaintext chunk size, in bytes. Matches the reference
134/// engine's `BUFFER_SIZE` (16 KiB).
135pub const DEFAULT_BUFFER_SIZE: usize = 16 * 1024;
136
137/// Default ciphertext chunk capacity, in bytes. The reference
138/// engine reserves an extra 1 KiB above the plaintext buffer to
139/// hold cipher overhead; we mirror that headroom, which is far
140/// more than PKCS#7 needs.
141pub const DEFAULT_CIPHER_SIZE: usize = DEFAULT_BUFFER_SIZE + 1024;
142
143/// Default snapshot header size, in bytes. Matches the reference
144/// engine's `MAX_HEADER_SIZE`.
145pub const DEFAULT_HEADER_SIZE: usize = 1024;
146
147/// Hard ceiling on the negotiated `header_size`, in bytes.
148///
149/// Mirrors the reference engine's `MAX_HEADER_SIZE` validation.
150pub const MAX_HEADER_SIZE: usize = 1024;
151
152/// Hard ceiling on the negotiated `buffer_size`, in bytes (5 MiB).
153///
154/// Mirrors the reference engine's `MAX_BUFFER_SIZE` validation.
155pub const MAX_BUFFER_SIZE: usize = 5 * 1024 * 1024;
156
157/// Hard ceiling on the negotiated `cipher_size`, in bytes (5 MiB).
158///
159/// Mirrors the reference engine's `MAX_CIPHER_SIZE` validation.
160pub const MAX_CIPHER_SIZE: usize = 5 * 1024 * 1024;
161
162/// Hard ceiling on a single snapshot's plaintext size, in bytes (4 GiB).
163///
164/// The reference engine does not bound this explicitly because its
165/// receiver consumes per-key chunks and never allocates the whole
166/// snapshot in one call. The Rust port stages every chunk into a
167/// `Vec<u8>` for replay and so MUST cap the upfront allocation. A
168/// malicious sender that completes the negotiation handshake could
169/// otherwise declare `total_len = u32::MAX` and trigger a 4 GiB
170/// allocation attempt before any payload bytes arrive. The cap below
171/// is a generous practical ceiling (4 GiB minus 1) that still lets
172/// large RDB snapshots through; embedders can plug their own
173/// `SnapshotSink` to apply tighter bounds.
174pub const MAX_SNAPSHOT_SIZE: usize = u32::MAX as usize - 1;
175
176/// Cap the receiver's pre-allocation hint to avoid a malicious or
177/// malformed `total_len` triggering an oversized allocation up
178/// front. Real RDB snapshots stream chunk-by-chunk; the receiver
179/// reallocates as plaintext arrives if the snapshot is genuinely
180/// larger than the hint.
181pub const SAFE_PREALLOC: usize = 16 * 1024 * 1024;
182
183/// Operator-facing configuration for the entropy worker.
184///
185/// # Examples
186///
187/// ```
188/// use std::path::PathBuf;
189/// use dynomite::entropy::EntropyConfig;
190/// let cfg = EntropyConfig {
191///     key_file: PathBuf::from("conf/recon_key.pem"),
192///     iv_file: PathBuf::from("conf/recon_iv.pem"),
193///     listen_addr: "127.0.0.1:8105".parse().unwrap(),
194///     send_addr: None,
195///     peer_endpoint: "127.0.0.1:8105".parse().unwrap(),
196///     buffer_size: 16 * 1024,
197///     header_size: 1024,
198///     encrypt: true,
199/// };
200/// assert_eq!(cfg.buffer_size, 16 * 1024);
201/// ```
202#[derive(Debug, Clone)]
203pub struct EntropyConfig {
204    /// On-disk path to the AES-128 key file.
205    pub key_file: PathBuf,
206    /// On-disk path to the AES-128-CBC IV file.
207    pub iv_file: PathBuf,
208    /// Address the [`EntropyReceiver`] binds to.
209    pub listen_addr: SocketAddr,
210    /// Optional local bind address for the [`EntropySender`]. When
211    /// `None` the kernel assigns an ephemeral port on the wildcard
212    /// address.
213    pub send_addr: Option<SocketAddr>,
214    /// Address the [`EntropySender`] dials.
215    pub peer_endpoint: SocketAddr,
216    /// Plaintext chunk size (bytes). Must be a multiple of 16 when
217    /// encryption is enabled and must not exceed
218    /// [`MAX_BUFFER_SIZE`].
219    pub buffer_size: usize,
220    /// Snapshot header size (bytes). Must not exceed
221    /// [`MAX_HEADER_SIZE`].
222    pub header_size: usize,
223    /// Whether per-chunk payloads are AES-128-CBC encrypted.
224    pub encrypt: bool,
225}
226
227impl EntropyConfig {
228    /// Validate the cross-field invariants the protocol demands.
229    ///
230    /// # Errors
231    /// [`EntropyError::Config`] when any invariant is violated.
232    ///
233    /// # Examples
234    ///
235    /// ```
236    /// use std::path::PathBuf;
237    /// use dynomite::entropy::EntropyConfig;
238    /// let mut cfg = EntropyConfig {
239    ///     key_file: PathBuf::from("k"),
240    ///     iv_file: PathBuf::from("v"),
241    ///     listen_addr: "127.0.0.1:0".parse().unwrap(),
242    ///     send_addr: None,
243    ///     peer_endpoint: "127.0.0.1:0".parse().unwrap(),
244    ///     buffer_size: 32,
245    ///     header_size: 64,
246    ///     encrypt: true,
247    /// };
248    /// assert!(cfg.validate().is_ok());
249    /// cfg.buffer_size = 0;
250    /// assert!(cfg.validate().is_err());
251    /// ```
252    pub fn validate(&self) -> Result<(), EntropyError> {
253        if self.buffer_size == 0 || self.buffer_size > MAX_BUFFER_SIZE {
254            return Err(EntropyError::Config(format!(
255                "buffer_size {} out of range (1..={MAX_BUFFER_SIZE})",
256                self.buffer_size
257            )));
258        }
259        if self.header_size < 8 || self.header_size > MAX_HEADER_SIZE {
260            return Err(EntropyError::Config(format!(
261                "header_size {} out of range (8..={MAX_HEADER_SIZE})",
262                self.header_size
263            )));
264        }
265        if self.encrypt && !self.buffer_size.is_multiple_of(16) {
266            return Err(EntropyError::Config(format!(
267                "buffer_size {} must be a multiple of 16 with encryption enabled",
268                self.buffer_size
269            )));
270        }
271        Ok(())
272    }
273}
274
275/// Snapshot byte source.
276///
277/// Implementations are typically pluggable via the Stage 13
278/// embedding API; the engine ships [`RedisLocalSnapshot`] as the
279/// default. Implementations are expected to be cheap to clone
280/// (e.g. shared via [`Arc`]) but each call to [`snapshot`] may
281/// produce a different blob.
282///
283/// [`snapshot`]: SnapshotSource::snapshot
284pub trait SnapshotSource: Send + Sync {
285    /// Produce one snapshot of the local state as a contiguous
286    /// byte buffer. The sender treats the bytes as opaque; the
287    /// receiver replays them through its [`SnapshotSink`].
288    ///
289    /// # Errors
290    /// Implementation-defined.
291    fn snapshot(&self) -> Result<Vec<u8>, EntropyError>;
292}
293
294/// Boxed [`SnapshotSource`] handed to [`EntropySender`].
295pub type BoxedSnapshotSource = Arc<dyn SnapshotSource>;
296
297impl<T> SnapshotSource for Arc<T>
298where
299    T: SnapshotSource + ?Sized,
300{
301    fn snapshot(&self) -> Result<Vec<u8>, EntropyError> {
302        (**self).snapshot()
303    }
304}
305
306/// Receiver-side hook that consumes the decrypted snapshot.
307///
308/// Implementations are typically pluggable via the Stage 13
309/// embedding API. The engine ships an in-memory implementation
310/// for tests and the [`receive::RedisReplaySink`] default for
311/// production wiring.
312pub trait SnapshotSink: Send + Sync {
313    /// Apply the full plaintext snapshot to the local datastore.
314    /// Called exactly once per connection.
315    ///
316    /// # Errors
317    /// Implementation-defined.
318    fn apply(&self, snapshot: &[u8]) -> Result<(), EntropyError>;
319}
320
321/// Boxed [`SnapshotSink`] handed to [`EntropyReceiver`].
322pub type BoxedSnapshotSink = Arc<dyn SnapshotSink>;
323
324impl<T> SnapshotSink for Arc<T>
325where
326    T: SnapshotSink + ?Sized,
327{
328    fn apply(&self, snapshot: &[u8]) -> Result<(), EntropyError> {
329        (**self).apply(snapshot)
330    }
331}
332
333/// Errors raised by the entropy module.
334#[derive(Debug, Error)]
335pub enum EntropyError {
336    /// I/O or socket failure.
337    #[error("entropy io: {0}")]
338    Io(#[from] io::Error),
339    /// Invalid configuration.
340    #[error("entropy config: {0}")]
341    Config(String),
342    /// Key or IV material is missing or malformed.
343    #[error("entropy key material: {0}")]
344    KeyMaterial(String),
345    /// Wire-protocol violation.
346    #[error("entropy protocol: {0}")]
347    Protocol(String),
348    /// AES-CBC decryption failure (bad padding or wrong key).
349    #[error("entropy crypto: {0}")]
350    Crypto(String),
351    /// Snapshot source produced an error.
352    #[error("entropy source: {0}")]
353    Source(String),
354    /// Snapshot sink rejected the replay.
355    #[error("entropy sink: {0}")]
356    Sink(String),
357}
358
359/// Convenience type alias.
360pub type EntropyResult<T> = Result<T, EntropyError>;
361
362/// Negotiation header that opens every entropy connection.
363///
364/// All five fields are 32-bit big-endian unsigned integers on the
365/// wire.
366#[derive(Debug, Clone, Copy, Eq, PartialEq)]
367pub struct NegotiationHeader {
368    /// Magic word; must equal [`ENTROPY_MAGIC`].
369    pub magic: u32,
370    /// Direction marker. [`ENTROPY_COMMAND_SEND`] = data flows
371    /// from sender to receiver.
372    pub command: u32,
373    /// Snapshot-header size, in bytes.
374    pub header_size: u32,
375    /// Plaintext chunk size, in bytes.
376    pub buffer_size: u32,
377    /// Ciphertext chunk capacity, in bytes.
378    pub cipher_size: u32,
379}
380
381impl NegotiationHeader {
382    /// Wire size in bytes.
383    pub const SIZE: usize = 5 * 4;
384
385    /// Encode the header to wire bytes.
386    #[must_use]
387    pub fn to_wire(self) -> [u8; Self::SIZE] {
388        let mut out = [0u8; Self::SIZE];
389        out[0..4].copy_from_slice(&self.magic.to_be_bytes());
390        out[4..8].copy_from_slice(&self.command.to_be_bytes());
391        out[8..12].copy_from_slice(&self.header_size.to_be_bytes());
392        out[12..16].copy_from_slice(&self.buffer_size.to_be_bytes());
393        out[16..20].copy_from_slice(&self.cipher_size.to_be_bytes());
394        out
395    }
396
397    /// Decode and validate a wire-format negotiation header.
398    ///
399    /// # Errors
400    /// [`EntropyError::Protocol`] if any field is out of range or
401    /// the magic word is wrong.
402    pub fn from_wire(bytes: &[u8; Self::SIZE]) -> Result<Self, EntropyError> {
403        let magic = u32::from_be_bytes(bytes[0..4].try_into().unwrap());
404        let command = u32::from_be_bytes(bytes[4..8].try_into().unwrap());
405        let header_size = u32::from_be_bytes(bytes[8..12].try_into().unwrap());
406        let buffer_size = u32::from_be_bytes(bytes[12..16].try_into().unwrap());
407        let cipher_size = u32::from_be_bytes(bytes[16..20].try_into().unwrap());
408        if magic != ENTROPY_MAGIC {
409            return Err(EntropyError::Protocol(format!(
410                "bad magic 0x{magic:08x}, expected 0x{ENTROPY_MAGIC:08x}"
411            )));
412        }
413        if command != ENTROPY_COMMAND_SEND {
414            return Err(EntropyError::Protocol(format!(
415                "unsupported command {command}"
416            )));
417        }
418        if header_size < 8 || header_size as usize > MAX_HEADER_SIZE {
419            return Err(EntropyError::Protocol(format!(
420                "header_size {header_size} out of range"
421            )));
422        }
423        if buffer_size == 0 || buffer_size as usize > MAX_BUFFER_SIZE {
424            return Err(EntropyError::Protocol(format!(
425                "buffer_size {buffer_size} out of range"
426            )));
427        }
428        if cipher_size == 0 || cipher_size as usize > MAX_CIPHER_SIZE {
429            return Err(EntropyError::Protocol(format!(
430                "cipher_size {cipher_size} out of range"
431            )));
432        }
433        Ok(Self {
434            magic,
435            command,
436            header_size,
437            buffer_size,
438            cipher_size,
439        })
440    }
441}
442
443/// Per-snapshot header carried inside the variable-sized header
444/// region declared by the negotiation step.
445///
446/// The first eight bytes are two big-endian `u32`s: the total
447/// plaintext length and the encrypt flag. The remaining bytes are
448/// reserved-for-future-use and must be transmitted as zero.
449#[derive(Debug, Clone, Copy, Eq, PartialEq)]
450pub struct SnapshotHeader {
451    /// Total plaintext length, in bytes.
452    pub total_len: u32,
453    /// `1` when chunks are AES-128-CBC encrypted, `0` otherwise.
454    pub encrypt_flag: u32,
455}
456
457impl SnapshotHeader {
458    /// Encode into a `header_size`-byte buffer.
459    ///
460    /// # Errors
461    /// [`EntropyError::Config`] if `header_size` is shorter than
462    /// the eight-byte fixed prefix.
463    pub fn to_wire(self, header_size: usize) -> Result<Vec<u8>, EntropyError> {
464        if header_size < 8 {
465            return Err(EntropyError::Config(format!(
466                "header_size {header_size} smaller than fixed 8-byte prefix"
467            )));
468        }
469        let mut out = vec![0u8; header_size];
470        out[0..4].copy_from_slice(&self.total_len.to_be_bytes());
471        out[4..8].copy_from_slice(&self.encrypt_flag.to_be_bytes());
472        Ok(out)
473    }
474
475    /// Decode from a `header_size`-byte buffer.
476    ///
477    /// # Errors
478    /// [`EntropyError::Protocol`] if the buffer is shorter than
479    /// the fixed prefix or carries an unknown `encrypt_flag`.
480    pub fn from_wire(bytes: &[u8]) -> Result<Self, EntropyError> {
481        if bytes.len() < 8 {
482            return Err(EntropyError::Protocol(format!(
483                "snapshot header too short ({} bytes)",
484                bytes.len()
485            )));
486        }
487        let total_len = u32::from_be_bytes(bytes[0..4].try_into().unwrap());
488        let encrypt_flag = u32::from_be_bytes(bytes[4..8].try_into().unwrap());
489        if encrypt_flag > 1 {
490            return Err(EntropyError::Protocol(format!(
491                "unknown encrypt_flag {encrypt_flag}"
492            )));
493        }
494        Ok(Self {
495            total_len,
496            encrypt_flag,
497        })
498    }
499}
500
501#[cfg(test)]
502mod tests {
503    use super::*;
504
505    #[test]
506    fn negotiation_header_round_trips() {
507        let hdr = NegotiationHeader {
508            magic: ENTROPY_MAGIC,
509            command: ENTROPY_COMMAND_SEND,
510            header_size: 1024,
511            buffer_size: 16 * 1024,
512            cipher_size: 17 * 1024,
513        };
514        let wire = hdr.to_wire();
515        let parsed = NegotiationHeader::from_wire(&wire).unwrap();
516        assert_eq!(parsed, hdr);
517    }
518
519    #[test]
520    fn negotiation_header_rejects_bad_magic() {
521        let mut wire = NegotiationHeader {
522            magic: 0xdead_beef,
523            command: ENTROPY_COMMAND_SEND,
524            header_size: 1024,
525            buffer_size: 16 * 1024,
526            cipher_size: 17 * 1024,
527        }
528        .to_wire();
529        // Force-write the bad magic in case the constructor ever changes.
530        wire[0..4].copy_from_slice(&0xdead_beefu32.to_be_bytes());
531        let err = NegotiationHeader::from_wire(&wire).unwrap_err();
532        assert!(matches!(err, EntropyError::Protocol(_)));
533    }
534
535    #[test]
536    fn snapshot_header_round_trips() {
537        let hdr = SnapshotHeader {
538            total_len: 4096,
539            encrypt_flag: 1,
540        };
541        let wire = hdr.to_wire(64).unwrap();
542        assert_eq!(wire.len(), 64);
543        for byte in &wire[8..] {
544            assert_eq!(*byte, 0);
545        }
546        let parsed = SnapshotHeader::from_wire(&wire).unwrap();
547        assert_eq!(parsed, hdr);
548    }
549
550    #[test]
551    fn snapshot_header_rejects_bad_flag() {
552        let mut wire = vec![0u8; 16];
553        wire[4..8].copy_from_slice(&5u32.to_be_bytes());
554        let err = SnapshotHeader::from_wire(&wire).unwrap_err();
555        assert!(matches!(err, EntropyError::Protocol(_)));
556    }
557
558    #[test]
559    fn config_validate_rejects_zero_buffer() {
560        let cfg = EntropyConfig {
561            key_file: PathBuf::from("k"),
562            iv_file: PathBuf::from("v"),
563            listen_addr: "127.0.0.1:0".parse().unwrap(),
564            send_addr: None,
565            peer_endpoint: "127.0.0.1:0".parse().unwrap(),
566            buffer_size: 0,
567            header_size: 64,
568            encrypt: true,
569        };
570        assert!(cfg.validate().is_err());
571    }
572}