use std::collections::{BTreeSet, VecDeque};
use std::ops::ControlFlow;
use crate::node::{Address, FetchResult, FetchResults, NodeId};
use super::{PrivateNetwork, ReplicationFactor};
#[derive(Debug)]
#[must_use]
pub struct Fetcher {
target: Target,
fetch_from: VecDeque<Ready>,
candidates: VecDeque<Candidate>,
results: FetchResults,
local_node: NodeId,
}
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum FetcherError {
#[error("no candidate seeds were found to fetch from")]
NoCandidates,
#[error(transparent)]
Target(#[from] TargetError),
}
impl Fetcher {
pub fn new(config: FetcherConfig) -> Result<Self, FetcherError> {
if config.candidates.is_empty() {
return Err(FetcherError::NoCandidates);
}
let replicas = config.replicas.min(config.candidates.len());
Ok(Self {
target: Target::new(config.seeds, replicas)?,
fetch_from: VecDeque::new(),
candidates: config.candidates,
results: FetchResults::default(),
local_node: config.local_node,
})
}
pub fn next_node(&mut self) -> Option<NodeId> {
let local_node = self.local_node;
let results = &self.results;
let include_node = |node: &NodeId| results.get(node).is_none() && local_node != *node;
std::iter::from_fn(|| self.candidates.pop_front()).find_map(|c| {
let node = c.nid();
include_node(&node).then_some(node)
})
}
pub fn next_fetch(&mut self) -> Option<(NodeId, Address)> {
self.fetch_from
.pop_front()
.map(|Ready { node, addr }| (node, addr))
.filter(|(node, _)| self.include_node(node))
}
pub fn fetch_failed(&mut self, node: NodeId, reason: impl ToString) {
let reason = reason.to_string();
self.results.push(node, FetchResult::Failed { reason })
}
pub fn fetch_complete(
&mut self,
node: NodeId,
result: FetchResult,
) -> ControlFlow<Success, Progress> {
self.results.push(node, result);
self.finished()
}
pub fn finish(self) -> FetcherResult {
let progress = self.progress();
match self.is_target_reached() {
None => {
let missing = self.missing_seeds();
FetcherResult::target_error(progress, self.target, self.results, missing)
}
Some(outcome) => FetcherResult::target_reached(outcome, progress, self.results),
}
}
pub fn ready_to_fetch(&mut self, node: NodeId, addr: Address) {
self.fetch_from.push_back(Ready { node, addr })
}
pub fn progress(&self) -> Progress {
let (preferred, succeeded) = self.success_counts();
Progress {
candidate: self.candidates.len(),
succeeded,
failed: self.results.failed().count(),
preferred,
}
}
pub fn target(&self) -> &Target {
&self.target
}
fn finished(&self) -> ControlFlow<Success, Progress> {
let progress = self.progress();
self.is_target_reached()
.map_or(ControlFlow::Continue(progress), |outcome| {
ControlFlow::Break(Success {
outcome,
progress,
results: self.results.clone(),
})
})
}
fn is_target_reached(&self) -> Option<SuccessfulOutcome> {
let (preferred, succeeded) = self.success_counts();
if !self.target.seeds.is_empty() && preferred >= self.target.seeds.len() {
Some(SuccessfulOutcome::PreferredNodes {
preferred: self.target.seeds.len(),
})
} else {
let replicas = self.target.replicas();
let min = replicas.lower_bound();
match replicas.upper_bound() {
None => (succeeded >= min).then_some(SuccessfulOutcome::MinReplicas { succeeded }),
Some(max) => (succeeded >= max).then_some(SuccessfulOutcome::MaxReplicas {
succeeded,
min,
max,
}),
}
}
}
fn include_node(&self, node: &NodeId) -> bool {
self.results.get(node).is_none() && self.local_node != *node
}
fn missing_seeds(&self) -> BTreeSet<NodeId> {
self.target
.seeds
.iter()
.filter(|nid| match self.results.get(nid) {
Some(r) if !r.is_success() => true,
None => true,
_ => false,
})
.copied()
.collect()
}
fn success_counts(&self) -> (usize, usize) {
self.results
.success()
.fold((0, 0), |(mut preferred, mut succeeded), (nid, _, _)| {
succeeded += 1;
if self.target.seeds.contains(nid) {
preferred += 1;
}
(preferred, succeeded)
})
}
}
#[derive(Clone, Copy, Debug)]
pub struct Progress {
candidate: usize,
succeeded: usize,
failed: usize,
preferred: usize,
}
impl Progress {
pub fn succeeded(&self) -> usize {
self.succeeded
}
pub fn failed(&self) -> usize {
self.failed
}
pub fn preferred(&self) -> usize {
self.preferred
}
pub fn candidate(&self) -> usize {
self.candidate
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct Target {
seeds: BTreeSet<NodeId>,
replicas: ReplicationFactor,
}
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
#[error("a minimum number of replicas or set of preferred seeds must be provided")]
pub struct TargetError;
impl Target {
pub fn new(seeds: BTreeSet<NodeId>, replicas: ReplicationFactor) -> Result<Self, TargetError> {
if replicas.lower_bound() == 0 && seeds.is_empty() {
Err(TargetError)
} else {
Ok(Self { seeds, replicas })
}
}
pub fn preferred_seeds(&self) -> &BTreeSet<NodeId> {
&self.seeds
}
pub fn replicas(&self) -> &ReplicationFactor {
&self.replicas
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum SuccessfulOutcome {
PreferredNodes {
preferred: usize,
},
MinReplicas {
succeeded: usize,
},
MaxReplicas {
succeeded: usize,
min: usize,
max: usize,
},
}
pub struct Success {
outcome: SuccessfulOutcome,
progress: Progress,
results: FetchResults,
}
impl Success {
pub fn progress(&self) -> Progress {
self.progress
}
pub fn fetch_results(&self) -> &FetchResults {
&self.results
}
pub fn outcome(&self) -> &SuccessfulOutcome {
&self.outcome
}
}
pub struct TargetMissed {
progress: Progress,
target: Target,
results: FetchResults,
required: usize,
missed_nodes: BTreeSet<NodeId>,
}
impl TargetMissed {
pub fn progress(&self) -> Progress {
self.progress
}
pub fn target(&self) -> &Target {
&self.target
}
pub fn fetch_results(&self) -> &FetchResults {
&self.results
}
pub fn missed_nodes(&self) -> &BTreeSet<NodeId> {
&self.missed_nodes
}
pub fn required_nodes(&self) -> usize {
self.required
}
}
pub enum FetcherResult {
TargetReached(Success),
TargetError(TargetMissed),
}
impl FetcherResult {
pub fn progress(&self) -> Progress {
match self {
FetcherResult::TargetReached(s) => s.progress(),
FetcherResult::TargetError(f) => f.progress(),
}
}
fn target_reached(
outcome: SuccessfulOutcome,
progress: Progress,
results: FetchResults,
) -> Self {
Self::TargetReached(Success {
outcome,
progress,
results,
})
}
fn target_error(
progress: Progress,
target: Target,
results: FetchResults,
missing: BTreeSet<NodeId>,
) -> Self {
let required = target
.replicas
.lower_bound()
.saturating_sub(progress.succeeded);
Self::TargetError(TargetMissed {
progress,
target,
results,
missed_nodes: missing,
required,
})
}
}
pub struct FetcherConfig {
seeds: BTreeSet<NodeId>,
replicas: ReplicationFactor,
candidates: VecDeque<Candidate>,
local_node: NodeId,
}
impl FetcherConfig {
pub fn private(
private: PrivateNetwork,
replicas: ReplicationFactor,
local_node: NodeId,
) -> Self {
let candidates = private
.allowed
.clone()
.into_iter()
.filter(|node| *node != local_node)
.map(Candidate::new)
.collect::<VecDeque<_>>();
Self {
seeds: private.allowed,
replicas,
candidates,
local_node,
}
}
pub fn public(
seeds: BTreeSet<NodeId>,
replicas: ReplicationFactor,
local_node: NodeId,
) -> Self {
let candidates = seeds
.clone()
.into_iter()
.filter(|node| *node != local_node)
.map(Candidate::new)
.collect::<VecDeque<_>>();
Self {
seeds,
replicas,
candidates,
local_node,
}
}
pub fn with_candidates(mut self, extra: impl IntoIterator<Item = Candidate>) -> Self {
self.candidates
.extend(extra.into_iter().filter(|c| c.nid() != self.local_node));
self
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct Candidate(NodeId);
impl Candidate {
pub fn new(node: NodeId) -> Self {
Self(node)
}
}
impl Candidate {
fn nid(&self) -> NodeId {
self.0
}
}
#[derive(Debug)]
struct Ready {
node: NodeId,
addr: Address,
}
#[cfg(test)]
mod test {
use std::collections::HashSet;
use crate::test::arbitrary;
use super::*;
#[test]
fn all_nodes_are_candidates() {
let local = arbitrary::gen::<NodeId>(0);
let replicas = ReplicationFactor::default();
let seeds = arbitrary::set::<NodeId>(3..=6)
.into_iter()
.collect::<BTreeSet<_>>();
let extra_candidates = arbitrary::vec::<NodeId>(3);
let config = FetcherConfig::public(seeds.clone(), replicas, local)
.with_candidates(extra_candidates.clone().into_iter().map(Candidate::new));
let mut fetcher = Fetcher::new(config).expect("fetcher should be constructed correctly");
let mut result = Vec::with_capacity(seeds.len() + extra_candidates.len());
let expected = seeds
.into_iter()
.chain(extra_candidates)
.collect::<Vec<_>>();
while let Some(node) = fetcher.next_node() {
result.push(node);
}
assert!(fetcher.next_fetch().is_none());
assert_eq!(result, expected);
}
#[test]
fn ignores_duplicates_and_local_node() {
let local = arbitrary::gen::<NodeId>(0);
let replicas = ReplicationFactor::default();
let bob = arbitrary::gen::<NodeId>(1);
let eve = arbitrary::gen::<NodeId>(2);
let seeds = [bob].into_iter().collect::<BTreeSet<_>>();
let extra_candidates = vec![bob, local, eve];
let config = FetcherConfig::public(seeds.clone(), replicas, local)
.with_candidates(extra_candidates.clone().into_iter().map(Candidate::new));
let mut fetcher = Fetcher::new(config).expect("fetcher should be constructed correctly");
let mut result = Vec::with_capacity(seeds.len() + extra_candidates.len());
let expected = vec![bob, eve];
while let Some(node) = fetcher.next_node() {
fetcher.fetch_failed(node, "could not connect");
result.push(node);
}
assert_eq!(result, expected);
}
#[test]
fn all_nodes_are_fetchable() {
let local = arbitrary::gen::<NodeId>(0);
let replicas = ReplicationFactor::default();
let seeds = arbitrary::set::<NodeId>(3..=6)
.into_iter()
.collect::<BTreeSet<_>>();
let extra_candidates = arbitrary::vec::<NodeId>(3);
let config = FetcherConfig::public(seeds.clone(), replicas, local)
.with_candidates(extra_candidates.clone().into_iter().map(Candidate::new));
let mut fetcher = Fetcher::new(config).expect("fetcher should be constructed correctly");
let mut result = Vec::with_capacity(seeds.len() + extra_candidates.len());
let expected = seeds
.into_iter()
.chain(extra_candidates)
.collect::<Vec<_>>();
while let Some(node) = fetcher.next_node() {
fetcher.ready_to_fetch(node, arbitrary::gen::<Address>(0));
}
while let Some((node, _)) = fetcher.next_fetch() {
result.push(node);
}
assert_eq!(result, expected);
}
#[test]
fn reaches_target_of_preferred_seeds() {
let local = arbitrary::gen::<NodeId>(0);
let replicas = ReplicationFactor::default();
let seeds = arbitrary::set::<NodeId>(3..=3)
.into_iter()
.collect::<BTreeSet<_>>();
let extra_candidates = arbitrary::vec::<NodeId>(3);
let config = FetcherConfig::public(seeds.clone(), replicas, local)
.with_candidates(extra_candidates.clone().into_iter().map(Candidate::new));
let mut fetcher = Fetcher::new(config).expect("fetcher should be constructed correctly");
let mut result = Vec::with_capacity(seeds.len());
let expected = seeds.into_iter().collect::<Vec<_>>();
while let Some(node) = fetcher.next_node() {
fetcher.ready_to_fetch(node, arbitrary::gen::<Address>(0));
if let Some((node, _)) = fetcher.next_fetch() {
match fetcher.fetch_complete(
node,
FetchResult::Success {
updated: vec![],
namespaces: HashSet::new(),
clone: false,
},
) {
ControlFlow::Continue(_) => result.push(node),
ControlFlow::Break(success) => {
assert_eq!(
*success.outcome(),
SuccessfulOutcome::PreferredNodes { preferred: 3 }
);
result.push(node);
break;
}
}
}
}
assert_eq!(result, expected);
}
#[test]
fn reaches_target_of_replicas() {
let local = arbitrary::gen::<NodeId>(0);
let replicas = ReplicationFactor::must_reach(3);
let seeds = arbitrary::set::<NodeId>(3..=3)
.into_iter()
.collect::<BTreeSet<_>>();
let extra_candidates = arbitrary::vec::<NodeId>(3);
let config = FetcherConfig::public(seeds.clone(), replicas, local)
.with_candidates(extra_candidates.clone().into_iter().map(Candidate::new));
let mut fetcher = Fetcher::new(config).expect("fetcher should be constructed correctly");
let mut result = Vec::with_capacity(extra_candidates.len());
let expected = extra_candidates
.clone()
.into_iter()
.take(replicas.lower_bound())
.collect::<Vec<_>>();
while let Some(node) = fetcher.next_node() {
fetcher.ready_to_fetch(node, arbitrary::gen::<Address>(0));
if let Some((node, _)) = fetcher.next_fetch() {
if seeds.contains(&node) {
fetcher.fetch_failed(node, "failed fetch");
continue;
}
match fetcher.fetch_complete(
node,
FetchResult::Success {
updated: vec![],
namespaces: HashSet::new(),
clone: false,
},
) {
ControlFlow::Continue(_) => result.push(node),
ControlFlow::Break(success) => {
assert_eq!(
*success.outcome(),
SuccessfulOutcome::MinReplicas { succeeded: 3 }
);
result.push(node);
break;
}
}
}
}
assert_eq!(result, expected);
}
#[test]
fn reaches_target_of_max_replicas() {
let local = arbitrary::gen::<NodeId>(0);
let replicas = ReplicationFactor::range(1, 3);
let candidates = arbitrary::set::<NodeId>(3..=3);
let seeds = candidates.iter().take(3).copied().collect::<BTreeSet<_>>();
let extra_candidates = candidates.into_iter().skip(3).collect::<Vec<_>>();
let config = FetcherConfig::public(seeds.clone(), replicas, local)
.with_candidates(extra_candidates.clone().into_iter().map(Candidate::new));
let mut fetcher = Fetcher::new(config).expect("fetcher should be constructed correctly");
let mut result = Vec::with_capacity(extra_candidates.len());
let expected = extra_candidates
.clone()
.into_iter()
.take(replicas.upper_bound().expect("replicas must have max"))
.collect::<Vec<_>>();
while let Some(node) = fetcher.next_node() {
fetcher.ready_to_fetch(node, arbitrary::gen::<Address>(0));
if let Some((node, _)) = fetcher.next_fetch() {
if seeds.contains(&node) {
fetcher.fetch_failed(node, "could not connect");
continue;
}
match fetcher.fetch_complete(
node,
FetchResult::Success {
updated: vec![],
namespaces: HashSet::new(),
clone: false,
},
) {
ControlFlow::Continue(_) => result.push(node),
ControlFlow::Break(success) => {
assert_eq!(
*success.outcome(),
SuccessfulOutcome::MaxReplicas {
succeeded: 3,
min: 1,
max: 3
}
);
result.push(node);
break;
}
}
}
}
assert_eq!(
result,
expected,
"expected {} seed(s), found {}",
expected.len(),
result.len(),
);
}
#[test]
fn preferred_seeds_target_returned_over_replicas() {
let local = arbitrary::gen::<NodeId>(0);
let replicas = ReplicationFactor::range(1, 3);
let candidates = arbitrary::set::<NodeId>(3..=3);
let seeds = candidates.into_iter().collect::<BTreeSet<_>>();
let config = FetcherConfig::public(seeds.clone(), replicas, local);
let mut fetcher = Fetcher::new(config).expect("fetcher should be constructed correctly");
let mut result = Vec::with_capacity(seeds.len());
while let Some(node) = fetcher.next_node() {
fetcher.ready_to_fetch(node, arbitrary::gen::<Address>(0));
if let Some((node, _)) = fetcher.next_fetch() {
match fetcher.fetch_complete(
node,
FetchResult::Success {
updated: vec![],
namespaces: HashSet::new(),
clone: false,
},
) {
ControlFlow::Continue(_) => result.push(node),
ControlFlow::Break(success) => {
assert_eq!(
*success.outcome(),
SuccessfulOutcome::PreferredNodes { preferred: 3 }
);
result.push(node);
break;
}
}
}
}
assert_eq!(result, seeds.into_iter().collect::<Vec<_>>());
}
#[test]
fn could_not_reach_target() {
let local = arbitrary::gen::<NodeId>(0);
let replicas = ReplicationFactor::must_reach(4);
let candidates = arbitrary::set::<NodeId>(3..=3);
let seeds = candidates.into_iter().collect::<BTreeSet<_>>();
let config = FetcherConfig::public(seeds.clone(), replicas, local);
let mut fetcher = Fetcher::new(config).expect("fetcher should be constructed correctly");
while let Some(node) = fetcher.next_node() {
fetcher.ready_to_fetch(node, arbitrary::gen::<Address>(0));
if let Some((node, _)) = fetcher.next_fetch() {
fetcher.fetch_failed(node, "could not connect");
}
}
let result = fetcher.finish();
assert!(matches!(result, FetcherResult::TargetError(_)));
}
}