use std::collections::VecDeque;
use std::num::NonZeroUsize;
pub trait ClusterLogEntry<N> {
fn concurrency(&self) -> Option<NonZeroUsize>;
fn added_nodes(&self) -> Vec<N>;
fn removed_nodes(&self) -> Vec<N>;
}
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub struct Cluster<N> {
nodes_at_offset: Vec<N>,
concurrency_at_offset: NonZeroUsize,
target_concurrency: NonZeroUsize,
log_entries: VecDeque<(NonZeroUsize, Vec<N>, Vec<N>)>,
}
impl<N, I> Cluster<N>
where
N: crate::NodeInfo<Id = I> + Clone,
I: crate::Identifier + Ord,
{
pub fn new(nodes: Vec<N>, concurrency: NonZeroUsize) -> Self {
Cluster {
nodes_at_offset: nodes,
concurrency_at_offset: concurrency,
target_concurrency: concurrency,
log_entries: VecDeque::new(),
}
}
pub fn apply<E: ClusterLogEntry<N>>(&mut self, log_entry: &E) {
let new_concurrency = log_entry.concurrency();
let mut added_nodes = log_entry.added_nodes();
let mut removed_nodes = log_entry.removed_nodes();
if !added_nodes.is_empty() && !removed_nodes.is_empty() {
added_nodes.sort_by_key(|n| n.id());
added_nodes.dedup_by_key(|n| n.id());
removed_nodes.sort_by_key(|n| n.id());
removed_nodes.dedup_by_key(|n| n.id());
let any_node_added_and_removed = added_nodes.iter().any(|a| {
removed_nodes
.binary_search_by_key(&a.id(), |n| n.id())
.is_ok()
});
assert!(
!any_node_added_and_removed,
"Simultaneously adding and removing a node is undefined."
);
}
let prev = self
.log_entries
.back()
.map(|(c, _, _)| *c)
.unwrap_or(self.concurrency_at_offset);
if self.target_concurrency < prev {
let mut compactable = self
.log_entries
.iter_mut()
.rev()
.enumerate()
.take_while(|(i, (c, _, _))| usize::from(prev) + i == usize::from(*c))
.last()
.map(|(_, (_, a, r))| (a, r));
if let Some((ref mut a, ref mut r)) = compactable {
if !removed_nodes.is_empty() {
a.retain(|n| {
removed_nodes
.binary_search_by_key(&n.id(), |n| n.id())
.is_err()
});
}
let mut added = Vec::new();
std::mem::swap(&mut added, &mut added_nodes);
if !added.is_empty() {
a.append(&mut added);
a.sort_by_key(|n| n.id());
a.dedup_by_key(|n| n.id());
}
let mut removed = Vec::new();
std::mem::swap(&mut removed, &mut removed_nodes);
if !removed.is_empty() {
r.append(&mut removed);
r.sort_by_key(|n| n.id());
r.dedup_by_key(|n| n.id());
}
}
}
if let Some(new_concurrency) = new_concurrency {
if new_concurrency > self.target_concurrency {
self.log_entries
.iter_mut()
.rev()
.enumerate()
.take_while(|(i, (c, _, _))| usize::from(*c) > i + 1)
.for_each(|(_, (c, _, _))| *c = new_concurrency);
}
self.target_concurrency = new_concurrency;
}
let next = NonZeroUsize::new(std::cmp::max::<usize>(
self.target_concurrency.into(),
usize::from(prev) - 1,
))
.unwrap();
self.log_entries
.push_back((next, added_nodes, removed_nodes));
while self.log_entries.len() >= self.concurrency_at_offset.into() {
let (c, added, removed) = self.log_entries.pop_front().unwrap();
self.concurrency_at_offset = c;
if !added.is_empty() {
self.nodes_at_offset.extend(added);
self.nodes_at_offset.sort_by_key(|n| n.id());
self.nodes_at_offset.dedup_by_key(|n| n.id());
}
self.nodes_at_offset
.retain(|n| removed.binary_search_by_key(&n.id(), |n| n.id()).is_err());
}
}
pub fn concurrency_at_offset_one(&self) -> NonZeroUsize {
self.log_entries
.back()
.map(|(c, _, _)| *c)
.unwrap_or(self.concurrency_at_offset)
}
pub fn nodes_at_offset_one(&self) -> Vec<N> {
self.nodes_at(NonZeroUsize::new(1).unwrap()).unwrap()
}
pub fn nodes_at(&self, round_offset: NonZeroUsize) -> Option<Vec<N>> {
if round_offset > self.concurrency_at_offset_one() {
return None;
}
let round_offset: usize = round_offset.into();
let concurrency: usize = self.concurrency_at_offset.into();
let take = round_offset - 1;
let take = take - std::cmp::min(take, concurrency - self.log_entries.len() - 1);
let take = std::cmp::min(take, self.log_entries.len());
let mut nodes = self.log_entries.iter().take(take).fold(
self.nodes_at_offset.clone(),
|mut aggregate, (_m, added, removed)| {
aggregate.extend(added.iter().cloned());
aggregate.retain(|n| removed.binary_search_by_key(&n.id(), |n| n.id()).is_err());
aggregate
},
);
nodes.sort_by_key(|n| n.id());
nodes.dedup_by_key(|n| n.id());
Some(nodes)
}
}
#[cfg(test)]
mod tests {
use std::num::NonZeroUsize;
use super::Cluster;
fn n(n: usize) -> NonZeroUsize {
NonZeroUsize::new(n).unwrap()
}
impl<N: Clone> super::ClusterLogEntry<N> for (Option<Vec<N>>, Option<Vec<N>>, Option<usize>) {
fn concurrency(&self) -> Option<NonZeroUsize> {
self.2.map(NonZeroUsize::new).map(Option::unwrap)
}
fn added_nodes(&self) -> Vec<N> {
self.0.clone().unwrap_or_default()
}
fn removed_nodes(&self) -> Vec<N> {
self.1.clone().unwrap_or_default()
}
}
impl crate::NodeInfo for usize {
type Id = usize;
fn id(&self) -> Self::Id {
*self
}
}
#[test]
fn test_initial_cluster() {
let n1 = 1;
let n2 = 2;
let cluster = Cluster::new(vec![n1, n2], n(1));
assert_eq!(cluster.nodes_at(n(1)), Some(vec![n1, n2]));
assert_eq!(cluster.nodes_at(n(2)), None);
}
#[test]
fn test_add_node_non_concurrently() {
let n1 = 1;
let n2 = 2;
let mut cluster = Cluster::new(vec![n1], n(1));
cluster.apply(&(Some(vec![n2]), None, None));
assert_eq!(cluster.nodes_at(n(1)), Some(vec![n1, n2]));
}
#[test]
fn test_add_node_concurrently() {
let n1 = 1;
let n2 = 2;
let mut cluster = Cluster::new(vec![n1], n(2));
cluster.apply(&(Some(vec![n2]), None, None));
assert_eq!(cluster.nodes_at(n(1)), Some(vec![n1]));
assert_eq!(cluster.nodes_at(n(2)), Some(vec![n1, n2]));
assert_eq!(cluster.nodes_at(n(3)), None);
cluster.apply(&(None, None, None));
assert_eq!(cluster.nodes_at(n(1)), Some(vec![n1, n2]));
assert_eq!(cluster.nodes_at(n(2)), Some(vec![n1, n2]));
assert_eq!(cluster.nodes_at(n(3)), None);
}
#[test]
fn test_remove_node_non_concurrently() {
let n1 = 1;
let n2 = 2;
let mut cluster = Cluster::new(vec![n1, n2], n(1));
cluster.apply(&(None, Some(vec![n2]), None));
assert_eq!(cluster.nodes_at(n(1)), Some(vec![n1]));
}
#[test]
fn test_remove_node_concurrently() {
let n1 = 1;
let n2 = 2;
let mut cluster = Cluster::new(vec![n1, n2], n(2));
cluster.apply(&(None, Some(vec![n2]), None));
assert_eq!(cluster.nodes_at(n(1)), Some(vec![n1, n2]));
assert_eq!(cluster.nodes_at(n(2)), Some(vec![n1]));
assert_eq!(cluster.nodes_at(n(3)), None);
cluster.apply(&(None, None, None));
assert_eq!(cluster.nodes_at(n(1)), Some(vec![n1]));
assert_eq!(cluster.nodes_at(n(2)), Some(vec![n1]));
assert_eq!(cluster.nodes_at(n(3)), None);
}
#[test]
fn test_concurrency() {
let n1 = 1;
let n2 = 2;
let mut cluster = Cluster::new(vec![n1], n(5));
assert_eq!(cluster.nodes_at(n(1)), Some(vec![n1]));
assert_eq!(cluster.nodes_at(n(2)), Some(vec![n1]));
assert_eq!(cluster.nodes_at(n(3)), Some(vec![n1]));
assert_eq!(cluster.nodes_at(n(4)), Some(vec![n1]));
assert_eq!(cluster.nodes_at(n(5)), Some(vec![n1]));
assert_eq!(cluster.nodes_at(n(6)), None);
cluster.apply(&(Some(vec![n2]), None, None));
assert_eq!(cluster.nodes_at(n(1)), Some(vec![n1]));
assert_eq!(cluster.nodes_at(n(2)), Some(vec![n1]));
assert_eq!(cluster.nodes_at(n(3)), Some(vec![n1]));
assert_eq!(cluster.nodes_at(n(4)), Some(vec![n1]));
assert_eq!(cluster.nodes_at(n(5)), Some(vec![n1, n2]));
assert_eq!(cluster.nodes_at(n(6)), None);
}
#[test]
fn test_increase_concurrency() {
let n1 = 1;
let n2 = 2;
let mut cluster = Cluster::new(vec![n1], n(3));
cluster.apply(&(Some(vec![n2]), None, None));
cluster.apply(&(None, None, Some(5)));
assert_eq!(cluster.nodes_at(n(1)), Some(vec![n1]));
assert_eq!(cluster.nodes_at(n(2)), Some(vec![n1, n2]));
assert_eq!(cluster.nodes_at(n(3)), Some(vec![n1, n2]));
assert_eq!(cluster.nodes_at(n(4)), Some(vec![n1, n2]));
assert_eq!(cluster.nodes_at(n(5)), Some(vec![n1, n2]));
assert_eq!(cluster.nodes_at(n(6)), None);
cluster.apply(&(None, Some(vec![n1]), None));
assert_eq!(cluster.nodes_at(n(1)), Some(vec![n1, n2]));
assert_eq!(cluster.nodes_at(n(2)), Some(vec![n1, n2]));
assert_eq!(cluster.nodes_at(n(3)), Some(vec![n1, n2]));
assert_eq!(cluster.nodes_at(n(4)), Some(vec![n1, n2]));
assert_eq!(cluster.nodes_at(n(5)), Some(vec![n2]));
assert_eq!(cluster.nodes_at(n(6)), None);
}
#[test]
fn test_decrease_concurrency_once() {
let n1 = 1;
let n2 = 2;
let n3 = 3;
let mut cluster = Cluster::new(vec![n1], n(5));
cluster.apply(&(None, None, Some(1)));
assert_eq!(cluster.nodes_at(n(1)), Some(vec![n1]));
assert_eq!(cluster.nodes_at(n(2)), Some(vec![n1]));
assert_eq!(cluster.nodes_at(n(3)), Some(vec![n1]));
assert_eq!(cluster.nodes_at(n(4)), Some(vec![n1]));
assert_eq!(cluster.nodes_at(n(5)), None);
cluster.apply(&(Some(vec![n2]), None, None));
assert_eq!(cluster.nodes_at(n(1)), Some(vec![n1]));
assert_eq!(cluster.nodes_at(n(2)), Some(vec![n1]));
assert_eq!(cluster.nodes_at(n(3)), Some(vec![n1]));
assert_eq!(cluster.nodes_at(n(4)), None);
cluster.apply(&(Some(vec![n3]), None, None));
assert_eq!(cluster.nodes_at(n(1)), Some(vec![n1]));
assert_eq!(cluster.nodes_at(n(2)), Some(vec![n1]));
assert_eq!(cluster.nodes_at(n(3)), None);
cluster.apply(&(None, Some(vec![n1]), None));
assert_eq!(cluster.nodes_at(n(1)), Some(vec![n1]));
assert_eq!(cluster.nodes_at(n(2)), None);
cluster.apply(&(None, None, None));
assert_eq!(cluster.nodes_at(n(1)), Some(vec![n2, n3]));
assert_eq!(cluster.nodes_at(n(2)), None);
}
#[test]
fn test_decrease_concurrency_twice() {
let n1 = 1;
let n2 = 2;
let n3 = 3;
let mut cluster = Cluster::new(vec![n1], n(5));
cluster.apply(&(None, None, Some(3)));
assert_eq!(cluster.nodes_at(n(1)), Some(vec![n1]));
assert_eq!(cluster.nodes_at(n(2)), Some(vec![n1]));
assert_eq!(cluster.nodes_at(n(3)), Some(vec![n1]));
assert_eq!(cluster.nodes_at(n(4)), Some(vec![n1]));
assert_eq!(cluster.nodes_at(n(5)), None);
cluster.apply(&(Some(vec![n2]), None, Some(1)));
assert_eq!(cluster.nodes_at(n(1)), Some(vec![n1]));
assert_eq!(cluster.nodes_at(n(2)), Some(vec![n1]));
assert_eq!(cluster.nodes_at(n(3)), Some(vec![n1]));
assert_eq!(cluster.nodes_at(n(4)), None);
cluster.apply(&(Some(vec![n3]), None, None));
assert_eq!(cluster.nodes_at(n(1)), Some(vec![n1]));
assert_eq!(cluster.nodes_at(n(2)), Some(vec![n1]));
assert_eq!(cluster.nodes_at(n(3)), None);
cluster.apply(&(None, Some(vec![n1]), None));
assert_eq!(cluster.nodes_at(n(1)), Some(vec![n1]));
assert_eq!(cluster.nodes_at(n(2)), None);
cluster.apply(&(None, None, None));
assert_eq!(cluster.nodes_at(n(1)), Some(vec![n2, n3]));
assert_eq!(cluster.nodes_at(n(2)), None);
}
#[test]
fn test_decrease_concurrency_then_increase_again() {
let n1 = 1;
let n2 = 2;
let n3 = 3;
let mut cluster = Cluster::new(vec![n1], n(5));
cluster.apply(&(None, None, Some(1)));
assert_eq!(cluster.nodes_at(n(1)), Some(vec![n1]));
assert_eq!(cluster.nodes_at(n(2)), Some(vec![n1]));
assert_eq!(cluster.nodes_at(n(3)), Some(vec![n1]));
assert_eq!(cluster.nodes_at(n(4)), Some(vec![n1]));
assert_eq!(cluster.nodes_at(n(5)), None);
cluster.apply(&(Some(vec![n2]), None, None));
assert_eq!(cluster.nodes_at(n(1)), Some(vec![n1]));
assert_eq!(cluster.nodes_at(n(2)), Some(vec![n1]));
assert_eq!(cluster.nodes_at(n(3)), Some(vec![n1]));
assert_eq!(cluster.nodes_at(n(4)), None);
cluster.apply(&(Some(vec![n3]), None, Some(5)));
assert_eq!(cluster.nodes_at(n(1)), Some(vec![n1]));
assert_eq!(cluster.nodes_at(n(2)), Some(vec![n1]));
assert_eq!(cluster.nodes_at(n(3)), Some(vec![n1, n2, n3]));
assert_eq!(cluster.nodes_at(n(4)), Some(vec![n1, n2, n3]));
assert_eq!(cluster.nodes_at(n(5)), Some(vec![n1, n2, n3]));
assert_eq!(cluster.nodes_at(n(6)), None);
cluster.apply(&(None, Some(vec![n1]), None));
assert_eq!(cluster.nodes_at(n(1)), Some(vec![n1]));
assert_eq!(cluster.nodes_at(n(2)), Some(vec![n1, n2, n3]));
assert_eq!(cluster.nodes_at(n(3)), Some(vec![n1, n2, n3]));
assert_eq!(cluster.nodes_at(n(4)), Some(vec![n1, n2, n3]));
assert_eq!(cluster.nodes_at(n(5)), Some(vec![n2, n3]));
assert_eq!(cluster.nodes_at(n(6)), None);
cluster.apply(&(None, None, None));
assert_eq!(cluster.nodes_at(n(1)), Some(vec![n1, n2, n3]));
assert_eq!(cluster.nodes_at(n(2)), Some(vec![n1, n2, n3]));
assert_eq!(cluster.nodes_at(n(3)), Some(vec![n1, n2, n3]));
assert_eq!(cluster.nodes_at(n(4)), Some(vec![n2, n3]));
assert_eq!(cluster.nodes_at(n(5)), Some(vec![n2, n3]));
assert_eq!(cluster.nodes_at(n(6)), None);
}
}