use std::net::{IpAddr, SocketAddr, ToSocketAddrs};
use std::sync::Arc;
use std::time::Duration;
use parking_lot::RwLock;
use tokio::sync::watch;
use crate::cluster::peer::{Peer, PeerEndpoint};
use crate::entropy::send::EntropySender;
use crate::entropy::util::EntropyMaterial;
use crate::entropy::{
BoxedSnapshotSource, EntropyConfig, EntropyError, EntropyResult, DEFAULT_BUFFER_SIZE,
DEFAULT_HEADER_SIZE,
};
pub const DEFAULT_RECON_INTERVAL: Duration = Duration::from_secs(300);
pub const DEFAULT_ENTROPY_PORT: u16 = 8105;
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
pub struct ReconCycle {
pub peers_attempted: u64,
pub peers_exchanged: u64,
pub ranges_diverged: u64,
pub ranges_repaired: u64,
}
impl ReconCycle {
pub fn record_attempted(&mut self) {
self.peers_attempted = self.peers_attempted.saturating_add(1);
}
pub fn record_exchanged(&mut self, bytes: usize) {
self.peers_exchanged = self.peers_exchanged.saturating_add(1);
if bytes > 0 {
self.ranges_diverged = self.ranges_diverged.saturating_add(1);
self.ranges_repaired = self.ranges_repaired.saturating_add(1);
}
}
pub fn merge(&mut self, other: ReconCycle) {
self.peers_attempted = self.peers_attempted.saturating_add(other.peers_attempted);
self.peers_exchanged = self.peers_exchanged.saturating_add(other.peers_exchanged);
self.ranges_diverged = self.ranges_diverged.saturating_add(other.ranges_diverged);
self.ranges_repaired = self.ranges_repaired.saturating_add(other.ranges_repaired);
}
}
pub async fn reconcile_with_peer(
material: &EntropyMaterial,
source: &BoxedSnapshotSource,
peer: &PeerEndpoint,
peer_port: u16,
buffer_size: usize,
header_size: usize,
encrypt: bool,
) -> EntropyResult<ReconCycle> {
let endpoint = resolve_peer_endpoint(peer, peer_port)?;
let cfg = EntropyConfig {
key_file: std::path::PathBuf::new(),
iv_file: std::path::PathBuf::new(),
listen_addr: endpoint,
send_addr: None,
peer_endpoint: endpoint,
buffer_size,
header_size,
encrypt,
};
let bytes =
EntropySender::push_with_material(cfg, source.clone(), Some(material.clone())).await?;
let mut cycle = ReconCycle::default();
cycle.record_attempted();
cycle.record_exchanged(bytes);
Ok(cycle)
}
fn resolve_peer_endpoint(peer: &PeerEndpoint, port: u16) -> EntropyResult<SocketAddr> {
if let Ok(ip) = peer.host().parse::<IpAddr>() {
return Ok(SocketAddr::new(ip, port));
}
let mut iter = (peer.host(), port)
.to_socket_addrs()
.map_err(EntropyError::Io)?;
iter.next().ok_or_else(|| {
EntropyError::Config(format!("could not resolve peer host '{}'", peer.host()))
})
}
pub struct EntropyDriver {
material: EntropyMaterial,
source: BoxedSnapshotSource,
peers: Arc<RwLock<Vec<Peer>>>,
cadence: Duration,
peer_port: u16,
buffer_size: usize,
header_size: usize,
encrypt: bool,
}
impl EntropyDriver {
#[must_use]
pub fn new(
material: EntropyMaterial,
source: BoxedSnapshotSource,
peers: Arc<RwLock<Vec<Peer>>>,
cadence: Duration,
) -> Self {
Self {
material,
source,
peers,
cadence: if cadence.is_zero() {
DEFAULT_RECON_INTERVAL
} else {
cadence
},
peer_port: DEFAULT_ENTROPY_PORT,
buffer_size: DEFAULT_BUFFER_SIZE,
header_size: DEFAULT_HEADER_SIZE,
encrypt: true,
}
}
#[must_use]
pub fn with_peer_port(mut self, port: u16) -> Self {
self.peer_port = port;
self
}
#[must_use]
pub fn with_buffer_size(mut self, bytes: usize) -> Self {
self.buffer_size = bytes;
self
}
#[must_use]
pub fn with_header_size(mut self, bytes: usize) -> Self {
self.header_size = bytes;
self
}
#[must_use]
pub fn with_encrypt(mut self, on: bool) -> Self {
self.encrypt = on;
self
}
#[must_use]
pub fn cadence(&self) -> Duration {
self.cadence
}
#[must_use]
pub fn peer_port(&self) -> u16 {
self.peer_port
}
pub async fn run_cycle(&self) -> ReconCycle {
let peer_list: Vec<Peer> = {
let guard = self.peers.read();
guard.iter().filter(|p| !p.is_local()).cloned().collect()
};
let mut total = ReconCycle::default();
for peer in &peer_list {
match reconcile_with_peer(
&self.material,
&self.source,
peer.endpoint(),
self.peer_port,
self.buffer_size,
self.header_size,
self.encrypt,
)
.await
{
Ok(cycle) => total.merge(cycle),
Err(e) => {
total.record_attempted();
tracing::warn!(
peer = %peer.endpoint().pname(),
error = %e,
"entropy reconciliation with peer failed"
);
}
}
}
total
}
pub async fn run_until_shutdown(self, mut shutdown: watch::Receiver<bool>) {
if *shutdown.borrow() {
return;
}
let mut tick = tokio::time::interval(self.cadence);
tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
tokio::select! {
biased;
changed = shutdown.changed() => {
if changed.is_err() || *shutdown.borrow() {
tracing::info!("entropy driver shutting down");
return;
}
}
_ = tick.tick() => {
let cycle = self.run_cycle().await;
tracing::info!(
peers_attempted = cycle.peers_attempted,
peers_exchanged = cycle.peers_exchanged,
ranges_diverged = cycle.ranges_diverged,
ranges_repaired = cycle.ranges_repaired,
"entropy reconciliation cycle completed"
);
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::cluster::peer::{Peer, PeerEndpoint};
use crate::entropy::send::StaticSnapshot;
use crate::entropy::util::{EntropyIv, EntropyKey};
use crate::hashkit::DynToken;
fn material() -> EntropyMaterial {
EntropyMaterial::new(
EntropyKey::from_bytes([0x10; 16]),
EntropyIv::from_bytes([0x42; 16]),
)
}
fn empty_source() -> BoxedSnapshotSource {
Arc::new(StaticSnapshot::new(Vec::new()))
}
#[test]
fn cadence_defaults_to_five_minutes_when_zero() {
let peers = Arc::new(RwLock::new(Vec::new()));
let driver = EntropyDriver::new(material(), empty_source(), peers, Duration::ZERO);
assert_eq!(driver.cadence(), DEFAULT_RECON_INTERVAL);
}
#[test]
fn cadence_passthrough_for_nonzero() {
let peers = Arc::new(RwLock::new(Vec::new()));
let driver = EntropyDriver::new(material(), empty_source(), peers, Duration::from_secs(1));
assert_eq!(driver.cadence(), Duration::from_secs(1));
assert_eq!(driver.peer_port(), DEFAULT_ENTROPY_PORT);
}
#[test]
fn cycle_record_helpers() {
let mut c = ReconCycle::default();
c.record_attempted();
c.record_attempted();
c.record_exchanged(0);
c.record_exchanged(128);
assert_eq!(c.peers_attempted, 2);
assert_eq!(c.peers_exchanged, 2);
assert_eq!(c.ranges_diverged, 1);
assert_eq!(c.ranges_repaired, 1);
}
#[test]
fn cycle_merge_sums_fields() {
let mut a = ReconCycle::default();
a.record_attempted();
a.record_exchanged(64);
let mut b = ReconCycle::default();
b.record_attempted();
b.record_exchanged(0);
a.merge(b);
assert_eq!(a.peers_attempted, 2);
assert_eq!(a.peers_exchanged, 2);
assert_eq!(a.ranges_diverged, 1);
}
#[tokio::test]
async fn driver_skips_local_peers_in_cycle() {
let local = Peer::new(
0,
PeerEndpoint::tcp("127.0.0.1".into(), 1),
"r".into(),
"d".into(),
vec![DynToken::from_u32(0)],
true,
true,
false,
);
let peers = Arc::new(RwLock::new(vec![local]));
let driver = EntropyDriver::new(material(), empty_source(), peers, Duration::from_secs(60));
let cycle = driver.run_cycle().await;
assert_eq!(cycle.peers_attempted, 0);
assert_eq!(cycle.peers_exchanged, 0);
}
#[tokio::test]
async fn driver_returns_immediately_when_shutdown_already_set() {
let peers = Arc::new(RwLock::new(Vec::new()));
let driver = EntropyDriver::new(material(), empty_source(), peers, Duration::from_secs(60));
let (tx, rx) = watch::channel(true);
let res =
tokio::time::timeout(Duration::from_millis(500), driver.run_until_shutdown(rx)).await;
assert!(res.is_ok(), "driver did not honour pre-set shutdown");
drop(tx);
}
}