dynomite/entropy/driver.rs
1//! Periodic entropy reconciliation driver.
2//!
3//! [`EntropyDriver`] is the long-running task that walks a
4//! [`ServerPool`]'s peer list at a configured cadence and calls
5//! [`reconcile_with_peer`] for every non-local entry. Each cycle
6//! produces a [`ReconCycle`] summary that is logged at INFO level
7//! so operators can verify the run loop is alive and observe
8//! divergence / repair counters as the cluster's state settles.
9//!
10//! The driver uses the existing [`crate::entropy::send::EntropySender::push`]
11//! primitive: each peer interaction is one snapshot push of the
12//! configured [`crate::entropy::SnapshotSource`]. Embedders that
13//! supply a richer source (e.g. one that carries per-range
14//! Merkle digests) get the corresponding richer reconciliation
15//! semantics for free; the default in-memory or RDB-backed
16//! sources still drive a full snapshot push per cycle.
17//!
18//! # Shutdown
19//!
20//! [`EntropyDriver::run_until_shutdown`] honours a
21//! `tokio::sync::watch::Receiver<bool>`: when the flag flips to
22//! `true` the loop drains the in-flight cycle (the
23//! per-peer reconciliations of the current tick complete) and
24//! returns. The next tick is suppressed.
25//!
26//! [`ServerPool`]: crate::cluster::pool::ServerPool
27
28use std::net::{IpAddr, SocketAddr, ToSocketAddrs};
29use std::sync::Arc;
30use std::time::Duration;
31
32use parking_lot::RwLock;
33use tokio::sync::watch;
34
35use crate::cluster::peer::{Peer, PeerEndpoint};
36use crate::entropy::send::EntropySender;
37use crate::entropy::util::EntropyMaterial;
38use crate::entropy::{
39 BoxedSnapshotSource, EntropyConfig, EntropyError, EntropyResult, DEFAULT_BUFFER_SIZE,
40 DEFAULT_HEADER_SIZE,
41};
42
43/// Default cadence for the entropy run loop (five minutes).
44///
45/// Mirrors the operator-visible default for the
46/// `recon_interval_seconds:` YAML directive.
47pub const DEFAULT_RECON_INTERVAL: Duration = Duration::from_secs(300);
48
49/// Default TCP port the entropy receiver listens on.
50///
51/// Mirrors the reference engine's `ENTROPY_PORT` macro
52/// (`8105`). When operators want a different port they can plug
53/// their own [`EntropyDriver`] together via [`EntropyDriver::with_peer_port`].
54pub const DEFAULT_ENTROPY_PORT: u16 = 8105;
55
56/// Outcome of a single reconciliation pass.
57///
58/// All four counters are simple totals over the peers visited
59/// during one cycle of [`EntropyDriver::run_cycle`].
60///
61/// # Examples
62///
63/// ```
64/// use dynomite::entropy::driver::ReconCycle;
65/// let mut c = ReconCycle::default();
66/// c.record_attempted();
67/// c.record_exchanged(128);
68/// assert_eq!(c.peers_attempted, 1);
69/// assert_eq!(c.peers_exchanged, 1);
70/// assert_eq!(c.ranges_diverged, 1);
71/// assert_eq!(c.ranges_repaired, 1);
72/// ```
73#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
74pub struct ReconCycle {
75 /// Peers the driver attempted to dial during the cycle.
76 pub peers_attempted: u64,
77 /// Peers the driver successfully exchanged a snapshot with.
78 pub peers_exchanged: u64,
79 /// Number of divergent ranges observed (one per peer when
80 /// the snapshot was non-empty).
81 pub ranges_diverged: u64,
82 /// Number of divergent ranges actually repaired (currently
83 /// equals [`Self::ranges_diverged`]: every range pushed is
84 /// considered repaired once the receiver acknowledges by
85 /// closing the socket).
86 pub ranges_repaired: u64,
87}
88
89impl ReconCycle {
90 /// Note that the driver dialled one more peer.
91 pub fn record_attempted(&mut self) {
92 self.peers_attempted = self.peers_attempted.saturating_add(1);
93 }
94
95 /// Note that one peer interaction completed successfully.
96 /// `bytes` is the plaintext snapshot length the sender
97 /// pushed; non-zero values are interpreted as one divergent
98 /// range repaired.
99 pub fn record_exchanged(&mut self, bytes: usize) {
100 self.peers_exchanged = self.peers_exchanged.saturating_add(1);
101 if bytes > 0 {
102 self.ranges_diverged = self.ranges_diverged.saturating_add(1);
103 self.ranges_repaired = self.ranges_repaired.saturating_add(1);
104 }
105 }
106
107 /// Merge `other` into `self`.
108 pub fn merge(&mut self, other: ReconCycle) {
109 self.peers_attempted = self.peers_attempted.saturating_add(other.peers_attempted);
110 self.peers_exchanged = self.peers_exchanged.saturating_add(other.peers_exchanged);
111 self.ranges_diverged = self.ranges_diverged.saturating_add(other.ranges_diverged);
112 self.ranges_repaired = self.ranges_repaired.saturating_add(other.ranges_repaired);
113 }
114}
115
116/// Run one reconciliation pass against `peer`.
117///
118/// Dials `peer` on `peer_port`, performs the negotiation
119/// handshake, and pushes one snapshot from `source`. The
120/// returned [`ReconCycle`] always reports `peers_attempted = 1`;
121/// `peers_exchanged` is `1` on success and `0` on failure, with
122/// the error returned in `Err(_)`.
123///
124/// # Errors
125/// [`EntropyError`] for resolution, dial, transport, or crypto
126/// faults. Callers (typically [`EntropyDriver`]) are expected to
127/// log and continue on `Err` rather than abort the cycle.
128///
129/// # Examples
130///
131/// ```no_run
132/// use std::sync::Arc;
133/// use dynomite::cluster::peer::PeerEndpoint;
134/// use dynomite::entropy::driver::reconcile_with_peer;
135/// use dynomite::entropy::send::StaticSnapshot;
136/// use dynomite::entropy::util::{EntropyIv, EntropyKey, EntropyMaterial};
137///
138/// # async fn run() {
139/// let mat = EntropyMaterial::new(
140/// EntropyKey::from_bytes([0x10; 16]),
141/// EntropyIv::from_bytes([0x42; 16]),
142/// );
143/// let source: dynomite::entropy::BoxedSnapshotSource =
144/// Arc::new(StaticSnapshot::new(b"hello".to_vec()));
145/// let peer = PeerEndpoint::tcp("127.0.0.1".into(), 9000);
146/// let cycle = reconcile_with_peer(&mat, &source, &peer, 8105, 256, 64, true)
147/// .await
148/// .unwrap();
149/// assert_eq!(cycle.peers_attempted, 1);
150/// # }
151/// ```
152pub async fn reconcile_with_peer(
153 material: &EntropyMaterial,
154 source: &BoxedSnapshotSource,
155 peer: &PeerEndpoint,
156 peer_port: u16,
157 buffer_size: usize,
158 header_size: usize,
159 encrypt: bool,
160) -> EntropyResult<ReconCycle> {
161 let endpoint = resolve_peer_endpoint(peer, peer_port)?;
162 let cfg = EntropyConfig {
163 // The on-disk paths are unused by `EntropySender::push`
164 // when material is supplied via the in-memory shortcut
165 // below, but the field is non-optional in the public
166 // struct. Use placeholder paths; the sender does not
167 // touch them because we override encryption with the
168 // already-loaded material.
169 key_file: std::path::PathBuf::new(),
170 iv_file: std::path::PathBuf::new(),
171 listen_addr: endpoint,
172 send_addr: None,
173 peer_endpoint: endpoint,
174 buffer_size,
175 header_size,
176 encrypt,
177 };
178 let bytes =
179 EntropySender::push_with_material(cfg, source.clone(), Some(material.clone())).await?;
180 let mut cycle = ReconCycle::default();
181 cycle.record_attempted();
182 cycle.record_exchanged(bytes);
183 Ok(cycle)
184}
185
186fn resolve_peer_endpoint(peer: &PeerEndpoint, port: u16) -> EntropyResult<SocketAddr> {
187 if let Ok(ip) = peer.host().parse::<IpAddr>() {
188 return Ok(SocketAddr::new(ip, port));
189 }
190 let mut iter = (peer.host(), port)
191 .to_socket_addrs()
192 .map_err(EntropyError::Io)?;
193 iter.next().ok_or_else(|| {
194 EntropyError::Config(format!("could not resolve peer host '{}'", peer.host()))
195 })
196}
197
198/// Periodic reconciliation driver.
199///
200/// Constructed by the embedding binary once the entropy key /
201/// IV material has been loaded; spawned as a tokio task with
202/// [`EntropyDriver::run_until_shutdown`].
203///
204/// # Examples
205///
206/// ```
207/// use std::sync::Arc;
208/// use std::time::Duration;
209/// use parking_lot::RwLock;
210/// use dynomite::entropy::driver::EntropyDriver;
211/// use dynomite::entropy::send::StaticSnapshot;
212/// use dynomite::entropy::util::{EntropyIv, EntropyKey, EntropyMaterial};
213///
214/// let mat = EntropyMaterial::new(
215/// EntropyKey::from_bytes([0x10; 16]),
216/// EntropyIv::from_bytes([0x42; 16]),
217/// );
218/// let source: dynomite::entropy::BoxedSnapshotSource =
219/// Arc::new(StaticSnapshot::new(Vec::new()));
220/// let peers = Arc::new(RwLock::new(Vec::new()));
221/// let driver = EntropyDriver::new(mat, source, peers, Duration::from_secs(300));
222/// assert_eq!(driver.cadence(), Duration::from_secs(300));
223/// ```
224pub struct EntropyDriver {
225 material: EntropyMaterial,
226 source: BoxedSnapshotSource,
227 peers: Arc<RwLock<Vec<Peer>>>,
228 cadence: Duration,
229 peer_port: u16,
230 buffer_size: usize,
231 header_size: usize,
232 encrypt: bool,
233}
234
235impl EntropyDriver {
236 /// Build a driver with the default entropy port and chunk
237 /// sizes.
238 #[must_use]
239 pub fn new(
240 material: EntropyMaterial,
241 source: BoxedSnapshotSource,
242 peers: Arc<RwLock<Vec<Peer>>>,
243 cadence: Duration,
244 ) -> Self {
245 Self {
246 material,
247 source,
248 peers,
249 cadence: if cadence.is_zero() {
250 DEFAULT_RECON_INTERVAL
251 } else {
252 cadence
253 },
254 peer_port: DEFAULT_ENTROPY_PORT,
255 buffer_size: DEFAULT_BUFFER_SIZE,
256 header_size: DEFAULT_HEADER_SIZE,
257 encrypt: true,
258 }
259 }
260
261 /// Override the per-peer entropy receiver port.
262 #[must_use]
263 pub fn with_peer_port(mut self, port: u16) -> Self {
264 self.peer_port = port;
265 self
266 }
267
268 /// Override the per-chunk plaintext buffer size in bytes.
269 #[must_use]
270 pub fn with_buffer_size(mut self, bytes: usize) -> Self {
271 self.buffer_size = bytes;
272 self
273 }
274
275 /// Override the snapshot header size in bytes.
276 #[must_use]
277 pub fn with_header_size(mut self, bytes: usize) -> Self {
278 self.header_size = bytes;
279 self
280 }
281
282 /// Disable AES-128-CBC encryption of per-chunk payloads.
283 /// Intended for tests; production deployments leave the
284 /// encryption flag at its default of `true`.
285 #[must_use]
286 pub fn with_encrypt(mut self, on: bool) -> Self {
287 self.encrypt = on;
288 self
289 }
290
291 /// Reconciliation cadence.
292 #[must_use]
293 pub fn cadence(&self) -> Duration {
294 self.cadence
295 }
296
297 /// Per-peer entropy receiver port the driver dials.
298 #[must_use]
299 pub fn peer_port(&self) -> u16 {
300 self.peer_port
301 }
302
303 /// Run a single reconciliation cycle: visit every non-local
304 /// peer in the pool, attempt one snapshot push each, and
305 /// return the aggregated [`ReconCycle`].
306 ///
307 /// Per-peer failures are logged at WARN and recorded as
308 /// `peers_attempted` (without bumping `peers_exchanged`).
309 pub async fn run_cycle(&self) -> ReconCycle {
310 // Snapshot the peer list to a local Vec so we do not
311 // hold the RwLock across awaits. The peer list rarely
312 // changes; copying a handful of `Peer` values per cycle
313 // is cheap relative to the per-peer TCP exchange.
314 let peer_list: Vec<Peer> = {
315 let guard = self.peers.read();
316 guard.iter().filter(|p| !p.is_local()).cloned().collect()
317 };
318 let mut total = ReconCycle::default();
319 for peer in &peer_list {
320 match reconcile_with_peer(
321 &self.material,
322 &self.source,
323 peer.endpoint(),
324 self.peer_port,
325 self.buffer_size,
326 self.header_size,
327 self.encrypt,
328 )
329 .await
330 {
331 Ok(cycle) => total.merge(cycle),
332 Err(e) => {
333 total.record_attempted();
334 tracing::warn!(
335 peer = %peer.endpoint().pname(),
336 error = %e,
337 "entropy reconciliation with peer failed"
338 );
339 }
340 }
341 }
342 total
343 }
344
345 /// Drive the periodic loop until `shutdown` is set.
346 ///
347 /// The first cycle runs immediately, mirroring how the
348 /// reference engine's entropy thread eagerly synchronises on
349 /// startup; subsequent cycles fire on `cadence`. A shutdown
350 /// observed mid-cycle is honoured at the next per-peer
351 /// boundary so the in-flight peer interaction completes
352 /// (the driver does not abort the AES handshake mid-frame).
353 pub async fn run_until_shutdown(self, mut shutdown: watch::Receiver<bool>) {
354 if *shutdown.borrow() {
355 return;
356 }
357 let mut tick = tokio::time::interval(self.cadence);
358 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
359 loop {
360 tokio::select! {
361 biased;
362 changed = shutdown.changed() => {
363 if changed.is_err() || *shutdown.borrow() {
364 tracing::info!("entropy driver shutting down");
365 return;
366 }
367 }
368 _ = tick.tick() => {
369 let cycle = self.run_cycle().await;
370 tracing::info!(
371 peers_attempted = cycle.peers_attempted,
372 peers_exchanged = cycle.peers_exchanged,
373 ranges_diverged = cycle.ranges_diverged,
374 ranges_repaired = cycle.ranges_repaired,
375 "entropy reconciliation cycle completed"
376 );
377 }
378 }
379 }
380 }
381}
382
383#[cfg(test)]
384mod tests {
385 use super::*;
386 use crate::cluster::peer::{Peer, PeerEndpoint};
387 use crate::entropy::send::StaticSnapshot;
388 use crate::entropy::util::{EntropyIv, EntropyKey};
389 use crate::hashkit::DynToken;
390
391 fn material() -> EntropyMaterial {
392 EntropyMaterial::new(
393 EntropyKey::from_bytes([0x10; 16]),
394 EntropyIv::from_bytes([0x42; 16]),
395 )
396 }
397
398 fn empty_source() -> BoxedSnapshotSource {
399 Arc::new(StaticSnapshot::new(Vec::new()))
400 }
401
402 #[test]
403 fn cadence_defaults_to_five_minutes_when_zero() {
404 let peers = Arc::new(RwLock::new(Vec::new()));
405 let driver = EntropyDriver::new(material(), empty_source(), peers, Duration::ZERO);
406 assert_eq!(driver.cadence(), DEFAULT_RECON_INTERVAL);
407 }
408
409 #[test]
410 fn cadence_passthrough_for_nonzero() {
411 let peers = Arc::new(RwLock::new(Vec::new()));
412 let driver = EntropyDriver::new(material(), empty_source(), peers, Duration::from_secs(1));
413 assert_eq!(driver.cadence(), Duration::from_secs(1));
414 assert_eq!(driver.peer_port(), DEFAULT_ENTROPY_PORT);
415 }
416
417 #[test]
418 fn cycle_record_helpers() {
419 let mut c = ReconCycle::default();
420 c.record_attempted();
421 c.record_attempted();
422 c.record_exchanged(0);
423 c.record_exchanged(128);
424 assert_eq!(c.peers_attempted, 2);
425 assert_eq!(c.peers_exchanged, 2);
426 assert_eq!(c.ranges_diverged, 1);
427 assert_eq!(c.ranges_repaired, 1);
428 }
429
430 #[test]
431 fn cycle_merge_sums_fields() {
432 let mut a = ReconCycle::default();
433 a.record_attempted();
434 a.record_exchanged(64);
435 let mut b = ReconCycle::default();
436 b.record_attempted();
437 b.record_exchanged(0);
438 a.merge(b);
439 assert_eq!(a.peers_attempted, 2);
440 assert_eq!(a.peers_exchanged, 2);
441 assert_eq!(a.ranges_diverged, 1);
442 }
443
444 #[tokio::test]
445 async fn driver_skips_local_peers_in_cycle() {
446 // A pool with only the local peer must complete a
447 // cycle in zero attempts.
448 let local = Peer::new(
449 0,
450 PeerEndpoint::tcp("127.0.0.1".into(), 1),
451 "r".into(),
452 "d".into(),
453 vec![DynToken::from_u32(0)],
454 true,
455 true,
456 false,
457 );
458 let peers = Arc::new(RwLock::new(vec![local]));
459 let driver = EntropyDriver::new(material(), empty_source(), peers, Duration::from_secs(60));
460 let cycle = driver.run_cycle().await;
461 assert_eq!(cycle.peers_attempted, 0);
462 assert_eq!(cycle.peers_exchanged, 0);
463 }
464
465 #[tokio::test]
466 async fn driver_returns_immediately_when_shutdown_already_set() {
467 let peers = Arc::new(RwLock::new(Vec::new()));
468 let driver = EntropyDriver::new(material(), empty_source(), peers, Duration::from_secs(60));
469 let (tx, rx) = watch::channel(true);
470 // The driver must observe the pre-set flag and return
471 // without ticking; if it ticked it would block for the
472 // full cadence.
473 let res =
474 tokio::time::timeout(Duration::from_millis(500), driver.run_until_shutdown(rx)).await;
475 assert!(res.is_ok(), "driver did not honour pre-set shutdown");
476 drop(tx);
477 }
478}