Skip to main content

dynomite/entropy/
driver.rs

1//! Periodic entropy reconciliation driver.
2//!
3//! [`EntropyDriver`] is the long-running task that walks a
4//! [`ServerPool`]'s peer list at a configured cadence and calls
5//! [`reconcile_with_peer`] for every non-local entry. Each cycle
6//! produces a [`ReconCycle`] summary that is logged at INFO level
7//! so operators can verify the run loop is alive and observe
8//! divergence / repair counters as the cluster's state settles.
9//!
10//! The driver uses the existing [`crate::entropy::send::EntropySender::push`]
11//! primitive: each peer interaction is one snapshot push of the
12//! configured [`crate::entropy::SnapshotSource`]. Embedders that
13//! supply a richer source (e.g. one that carries per-range
14//! Merkle digests) get the corresponding richer reconciliation
15//! semantics for free; the default in-memory or RDB-backed
16//! sources still drive a full snapshot push per cycle.
17//!
18//! # Shutdown
19//!
20//! [`EntropyDriver::run_until_shutdown`] honours a
21//! `tokio::sync::watch::Receiver<bool>`: when the flag flips to
22//! `true` the loop drains the in-flight cycle (the
23//! per-peer reconciliations of the current tick complete) and
24//! returns. The next tick is suppressed.
25//!
26//! [`ServerPool`]: crate::cluster::pool::ServerPool
27
28use std::net::{IpAddr, SocketAddr, ToSocketAddrs};
29use std::sync::Arc;
30use std::time::Duration;
31
32use parking_lot::RwLock;
33use tokio::sync::watch;
34
35use crate::cluster::peer::{Peer, PeerEndpoint};
36use crate::entropy::send::EntropySender;
37use crate::entropy::util::EntropyMaterial;
38use crate::entropy::{
39    BoxedSnapshotSource, EntropyConfig, EntropyError, EntropyResult, DEFAULT_BUFFER_SIZE,
40    DEFAULT_HEADER_SIZE,
41};
42
43/// Default cadence for the entropy run loop (five minutes).
44///
45/// Mirrors the operator-visible default for the
46/// `recon_interval_seconds:` YAML directive.
47pub const DEFAULT_RECON_INTERVAL: Duration = Duration::from_secs(300);
48
49/// Default TCP port the entropy receiver listens on.
50///
51/// Mirrors the reference engine's `ENTROPY_PORT` macro
52/// (`8105`). When operators want a different port they can plug
53/// their own [`EntropyDriver`] together via [`EntropyDriver::with_peer_port`].
54pub const DEFAULT_ENTROPY_PORT: u16 = 8105;
55
56/// Outcome of a single reconciliation pass.
57///
58/// All four counters are simple totals over the peers visited
59/// during one cycle of [`EntropyDriver::run_cycle`].
60///
61/// # Examples
62///
63/// ```
64/// use dynomite::entropy::driver::ReconCycle;
65/// let mut c = ReconCycle::default();
66/// c.record_attempted();
67/// c.record_exchanged(128);
68/// assert_eq!(c.peers_attempted, 1);
69/// assert_eq!(c.peers_exchanged, 1);
70/// assert_eq!(c.ranges_diverged, 1);
71/// assert_eq!(c.ranges_repaired, 1);
72/// ```
73#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
74pub struct ReconCycle {
75    /// Peers the driver attempted to dial during the cycle.
76    pub peers_attempted: u64,
77    /// Peers the driver successfully exchanged a snapshot with.
78    pub peers_exchanged: u64,
79    /// Number of divergent ranges observed (one per peer when
80    /// the snapshot was non-empty).
81    pub ranges_diverged: u64,
82    /// Number of divergent ranges actually repaired (currently
83    /// equals [`Self::ranges_diverged`]: every range pushed is
84    /// considered repaired once the receiver acknowledges by
85    /// closing the socket).
86    pub ranges_repaired: u64,
87}
88
89impl ReconCycle {
90    /// Note that the driver dialled one more peer.
91    pub fn record_attempted(&mut self) {
92        self.peers_attempted = self.peers_attempted.saturating_add(1);
93    }
94
95    /// Note that one peer interaction completed successfully.
96    /// `bytes` is the plaintext snapshot length the sender
97    /// pushed; non-zero values are interpreted as one divergent
98    /// range repaired.
99    pub fn record_exchanged(&mut self, bytes: usize) {
100        self.peers_exchanged = self.peers_exchanged.saturating_add(1);
101        if bytes > 0 {
102            self.ranges_diverged = self.ranges_diverged.saturating_add(1);
103            self.ranges_repaired = self.ranges_repaired.saturating_add(1);
104        }
105    }
106
107    /// Merge `other` into `self`.
108    pub fn merge(&mut self, other: ReconCycle) {
109        self.peers_attempted = self.peers_attempted.saturating_add(other.peers_attempted);
110        self.peers_exchanged = self.peers_exchanged.saturating_add(other.peers_exchanged);
111        self.ranges_diverged = self.ranges_diverged.saturating_add(other.ranges_diverged);
112        self.ranges_repaired = self.ranges_repaired.saturating_add(other.ranges_repaired);
113    }
114}
115
116/// Run one reconciliation pass against `peer`.
117///
118/// Dials `peer` on `peer_port`, performs the negotiation
119/// handshake, and pushes one snapshot from `source`. The
120/// returned [`ReconCycle`] always reports `peers_attempted = 1`;
121/// `peers_exchanged` is `1` on success and `0` on failure, with
122/// the error returned in `Err(_)`.
123///
124/// # Errors
125/// [`EntropyError`] for resolution, dial, transport, or crypto
126/// faults. Callers (typically [`EntropyDriver`]) are expected to
127/// log and continue on `Err` rather than abort the cycle.
128///
129/// # Examples
130///
131/// ```no_run
132/// use std::sync::Arc;
133/// use dynomite::cluster::peer::PeerEndpoint;
134/// use dynomite::entropy::driver::reconcile_with_peer;
135/// use dynomite::entropy::send::StaticSnapshot;
136/// use dynomite::entropy::util::{EntropyIv, EntropyKey, EntropyMaterial};
137///
138/// # async fn run() {
139/// let mat = EntropyMaterial::new(
140///     EntropyKey::from_bytes([0x10; 16]),
141///     EntropyIv::from_bytes([0x42; 16]),
142/// );
143/// let source: dynomite::entropy::BoxedSnapshotSource =
144///     Arc::new(StaticSnapshot::new(b"hello".to_vec()));
145/// let peer = PeerEndpoint::tcp("127.0.0.1".into(), 9000);
146/// let cycle = reconcile_with_peer(&mat, &source, &peer, 8105, 256, 64, true)
147///     .await
148///     .unwrap();
149/// assert_eq!(cycle.peers_attempted, 1);
150/// # }
151/// ```
152pub async fn reconcile_with_peer(
153    material: &EntropyMaterial,
154    source: &BoxedSnapshotSource,
155    peer: &PeerEndpoint,
156    peer_port: u16,
157    buffer_size: usize,
158    header_size: usize,
159    encrypt: bool,
160) -> EntropyResult<ReconCycle> {
161    let endpoint = resolve_peer_endpoint(peer, peer_port)?;
162    let cfg = EntropyConfig {
163        // The on-disk paths are unused by `EntropySender::push`
164        // when material is supplied via the in-memory shortcut
165        // below, but the field is non-optional in the public
166        // struct. Use placeholder paths; the sender does not
167        // touch them because we override encryption with the
168        // already-loaded material.
169        key_file: std::path::PathBuf::new(),
170        iv_file: std::path::PathBuf::new(),
171        listen_addr: endpoint,
172        send_addr: None,
173        peer_endpoint: endpoint,
174        buffer_size,
175        header_size,
176        encrypt,
177    };
178    let bytes =
179        EntropySender::push_with_material(cfg, source.clone(), Some(material.clone())).await?;
180    let mut cycle = ReconCycle::default();
181    cycle.record_attempted();
182    cycle.record_exchanged(bytes);
183    Ok(cycle)
184}
185
186fn resolve_peer_endpoint(peer: &PeerEndpoint, port: u16) -> EntropyResult<SocketAddr> {
187    if let Ok(ip) = peer.host().parse::<IpAddr>() {
188        return Ok(SocketAddr::new(ip, port));
189    }
190    let mut iter = (peer.host(), port)
191        .to_socket_addrs()
192        .map_err(EntropyError::Io)?;
193    iter.next().ok_or_else(|| {
194        EntropyError::Config(format!("could not resolve peer host '{}'", peer.host()))
195    })
196}
197
198/// Periodic reconciliation driver.
199///
200/// Constructed by the embedding binary once the entropy key /
201/// IV material has been loaded; spawned as a tokio task with
202/// [`EntropyDriver::run_until_shutdown`].
203///
204/// # Examples
205///
206/// ```
207/// use std::sync::Arc;
208/// use std::time::Duration;
209/// use parking_lot::RwLock;
210/// use dynomite::entropy::driver::EntropyDriver;
211/// use dynomite::entropy::send::StaticSnapshot;
212/// use dynomite::entropy::util::{EntropyIv, EntropyKey, EntropyMaterial};
213///
214/// let mat = EntropyMaterial::new(
215///     EntropyKey::from_bytes([0x10; 16]),
216///     EntropyIv::from_bytes([0x42; 16]),
217/// );
218/// let source: dynomite::entropy::BoxedSnapshotSource =
219///     Arc::new(StaticSnapshot::new(Vec::new()));
220/// let peers = Arc::new(RwLock::new(Vec::new()));
221/// let driver = EntropyDriver::new(mat, source, peers, Duration::from_secs(300));
222/// assert_eq!(driver.cadence(), Duration::from_secs(300));
223/// ```
224pub struct EntropyDriver {
225    material: EntropyMaterial,
226    source: BoxedSnapshotSource,
227    peers: Arc<RwLock<Vec<Peer>>>,
228    cadence: Duration,
229    peer_port: u16,
230    buffer_size: usize,
231    header_size: usize,
232    encrypt: bool,
233}
234
235impl EntropyDriver {
236    /// Build a driver with the default entropy port and chunk
237    /// sizes.
238    #[must_use]
239    pub fn new(
240        material: EntropyMaterial,
241        source: BoxedSnapshotSource,
242        peers: Arc<RwLock<Vec<Peer>>>,
243        cadence: Duration,
244    ) -> Self {
245        Self {
246            material,
247            source,
248            peers,
249            cadence: if cadence.is_zero() {
250                DEFAULT_RECON_INTERVAL
251            } else {
252                cadence
253            },
254            peer_port: DEFAULT_ENTROPY_PORT,
255            buffer_size: DEFAULT_BUFFER_SIZE,
256            header_size: DEFAULT_HEADER_SIZE,
257            encrypt: true,
258        }
259    }
260
261    /// Override the per-peer entropy receiver port.
262    #[must_use]
263    pub fn with_peer_port(mut self, port: u16) -> Self {
264        self.peer_port = port;
265        self
266    }
267
268    /// Override the per-chunk plaintext buffer size in bytes.
269    #[must_use]
270    pub fn with_buffer_size(mut self, bytes: usize) -> Self {
271        self.buffer_size = bytes;
272        self
273    }
274
275    /// Override the snapshot header size in bytes.
276    #[must_use]
277    pub fn with_header_size(mut self, bytes: usize) -> Self {
278        self.header_size = bytes;
279        self
280    }
281
282    /// Disable AES-128-CBC encryption of per-chunk payloads.
283    /// Intended for tests; production deployments leave the
284    /// encryption flag at its default of `true`.
285    #[must_use]
286    pub fn with_encrypt(mut self, on: bool) -> Self {
287        self.encrypt = on;
288        self
289    }
290
291    /// Reconciliation cadence.
292    #[must_use]
293    pub fn cadence(&self) -> Duration {
294        self.cadence
295    }
296
297    /// Per-peer entropy receiver port the driver dials.
298    #[must_use]
299    pub fn peer_port(&self) -> u16 {
300        self.peer_port
301    }
302
303    /// Run a single reconciliation cycle: visit every non-local
304    /// peer in the pool, attempt one snapshot push each, and
305    /// return the aggregated [`ReconCycle`].
306    ///
307    /// Per-peer failures are logged at WARN and recorded as
308    /// `peers_attempted` (without bumping `peers_exchanged`).
309    pub async fn run_cycle(&self) -> ReconCycle {
310        // Snapshot the peer list to a local Vec so we do not
311        // hold the RwLock across awaits. The peer list rarely
312        // changes; copying a handful of `Peer` values per cycle
313        // is cheap relative to the per-peer TCP exchange.
314        let peer_list: Vec<Peer> = {
315            let guard = self.peers.read();
316            guard.iter().filter(|p| !p.is_local()).cloned().collect()
317        };
318        let mut total = ReconCycle::default();
319        for peer in &peer_list {
320            match reconcile_with_peer(
321                &self.material,
322                &self.source,
323                peer.endpoint(),
324                self.peer_port,
325                self.buffer_size,
326                self.header_size,
327                self.encrypt,
328            )
329            .await
330            {
331                Ok(cycle) => total.merge(cycle),
332                Err(e) => {
333                    total.record_attempted();
334                    tracing::warn!(
335                        peer = %peer.endpoint().pname(),
336                        error = %e,
337                        "entropy reconciliation with peer failed"
338                    );
339                }
340            }
341        }
342        total
343    }
344
345    /// Drive the periodic loop until `shutdown` is set.
346    ///
347    /// The first cycle runs immediately, mirroring how the
348    /// reference engine's entropy thread eagerly synchronises on
349    /// startup; subsequent cycles fire on `cadence`. A shutdown
350    /// observed mid-cycle is honoured at the next per-peer
351    /// boundary so the in-flight peer interaction completes
352    /// (the driver does not abort the AES handshake mid-frame).
353    pub async fn run_until_shutdown(self, mut shutdown: watch::Receiver<bool>) {
354        if *shutdown.borrow() {
355            return;
356        }
357        let mut tick = tokio::time::interval(self.cadence);
358        tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
359        loop {
360            tokio::select! {
361                biased;
362                changed = shutdown.changed() => {
363                    if changed.is_err() || *shutdown.borrow() {
364                        tracing::info!("entropy driver shutting down");
365                        return;
366                    }
367                }
368                _ = tick.tick() => {
369                    let cycle = self.run_cycle().await;
370                    tracing::info!(
371                        peers_attempted = cycle.peers_attempted,
372                        peers_exchanged = cycle.peers_exchanged,
373                        ranges_diverged = cycle.ranges_diverged,
374                        ranges_repaired = cycle.ranges_repaired,
375                        "entropy reconciliation cycle completed"
376                    );
377                }
378            }
379        }
380    }
381}
382
383#[cfg(test)]
384mod tests {
385    use super::*;
386    use crate::cluster::peer::{Peer, PeerEndpoint};
387    use crate::entropy::send::StaticSnapshot;
388    use crate::entropy::util::{EntropyIv, EntropyKey};
389    use crate::hashkit::DynToken;
390
391    fn material() -> EntropyMaterial {
392        EntropyMaterial::new(
393            EntropyKey::from_bytes([0x10; 16]),
394            EntropyIv::from_bytes([0x42; 16]),
395        )
396    }
397
398    fn empty_source() -> BoxedSnapshotSource {
399        Arc::new(StaticSnapshot::new(Vec::new()))
400    }
401
402    #[test]
403    fn cadence_defaults_to_five_minutes_when_zero() {
404        let peers = Arc::new(RwLock::new(Vec::new()));
405        let driver = EntropyDriver::new(material(), empty_source(), peers, Duration::ZERO);
406        assert_eq!(driver.cadence(), DEFAULT_RECON_INTERVAL);
407    }
408
409    #[test]
410    fn cadence_passthrough_for_nonzero() {
411        let peers = Arc::new(RwLock::new(Vec::new()));
412        let driver = EntropyDriver::new(material(), empty_source(), peers, Duration::from_secs(1));
413        assert_eq!(driver.cadence(), Duration::from_secs(1));
414        assert_eq!(driver.peer_port(), DEFAULT_ENTROPY_PORT);
415    }
416
417    #[test]
418    fn cycle_record_helpers() {
419        let mut c = ReconCycle::default();
420        c.record_attempted();
421        c.record_attempted();
422        c.record_exchanged(0);
423        c.record_exchanged(128);
424        assert_eq!(c.peers_attempted, 2);
425        assert_eq!(c.peers_exchanged, 2);
426        assert_eq!(c.ranges_diverged, 1);
427        assert_eq!(c.ranges_repaired, 1);
428    }
429
430    #[test]
431    fn cycle_merge_sums_fields() {
432        let mut a = ReconCycle::default();
433        a.record_attempted();
434        a.record_exchanged(64);
435        let mut b = ReconCycle::default();
436        b.record_attempted();
437        b.record_exchanged(0);
438        a.merge(b);
439        assert_eq!(a.peers_attempted, 2);
440        assert_eq!(a.peers_exchanged, 2);
441        assert_eq!(a.ranges_diverged, 1);
442    }
443
444    #[tokio::test]
445    async fn driver_skips_local_peers_in_cycle() {
446        // A pool with only the local peer must complete a
447        // cycle in zero attempts.
448        let local = Peer::new(
449            0,
450            PeerEndpoint::tcp("127.0.0.1".into(), 1),
451            "r".into(),
452            "d".into(),
453            vec![DynToken::from_u32(0)],
454            true,
455            true,
456            false,
457        );
458        let peers = Arc::new(RwLock::new(vec![local]));
459        let driver = EntropyDriver::new(material(), empty_source(), peers, Duration::from_secs(60));
460        let cycle = driver.run_cycle().await;
461        assert_eq!(cycle.peers_attempted, 0);
462        assert_eq!(cycle.peers_exchanged, 0);
463    }
464
465    #[tokio::test]
466    async fn driver_returns_immediately_when_shutdown_already_set() {
467        let peers = Arc::new(RwLock::new(Vec::new()));
468        let driver = EntropyDriver::new(material(), empty_source(), peers, Duration::from_secs(60));
469        let (tx, rx) = watch::channel(true);
470        // The driver must observe the pre-set flag and return
471        // without ticking; if it ticked it would block for the
472        // full cadence.
473        let res =
474            tokio::time::timeout(Duration::from_millis(500), driver.run_until_shutdown(rx)).await;
475        assert!(res.is_ok(), "driver did not honour pre-set shutdown");
476        drop(tx);
477    }
478}