use std::cmp::Ordering;
use std::collections::{HashMap, HashSet};
use serde::{Deserialize, Serialize};
use garage_util::crdt::{AutoCrdt, Crdt, LwwMap};
use garage_util::data::*;
use garage_util::error::*;
use crate::ring::*;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ClusterLayout {
pub version: u64,
pub replication_factor: usize,
pub roles: LwwMap<Uuid, NodeRoleV>,
pub node_id_vec: Vec<Uuid>,
#[serde(with = "serde_bytes")]
pub ring_assignation_data: Vec<CompactNodeType>,
pub staging: LwwMap<Uuid, NodeRoleV>,
pub staging_hash: Hash,
}
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct NodeRoleV(pub Option<NodeRole>);
impl AutoCrdt for NodeRoleV {
const WARN_IF_DIFFERENT: bool = true;
}
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct NodeRole {
pub zone: String,
pub capacity: Option<u32>,
pub tags: Vec<String>,
}
impl NodeRole {
pub fn capacity_string(&self) -> String {
match self.capacity {
Some(c) => format!("{}", c),
None => "gateway".to_string(),
}
}
}
impl ClusterLayout {
pub fn new(replication_factor: usize) -> Self {
let empty_lwwmap = LwwMap::new();
let empty_lwwmap_hash = blake2sum(&rmp_to_vec_all_named(&empty_lwwmap).unwrap()[..]);
ClusterLayout {
version: 0,
replication_factor,
roles: LwwMap::new(),
node_id_vec: Vec::new(),
ring_assignation_data: Vec::new(),
staging: empty_lwwmap,
staging_hash: empty_lwwmap_hash,
}
}
pub fn merge(&mut self, other: &ClusterLayout) -> bool {
match other.version.cmp(&self.version) {
Ordering::Greater => {
*self = other.clone();
true
}
Ordering::Equal => {
self.staging.merge(&other.staging);
let new_staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]);
let changed = new_staging_hash != self.staging_hash;
self.staging_hash = new_staging_hash;
changed
}
Ordering::Less => false,
}
}
pub fn apply_staged_changes(mut self, version: Option<u64>) -> Result<Self, Error> {
match version {
None => {
let error = r#"
Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout.
To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes.
"#;
return Err(Error::Message(error.into()));
}
Some(v) => {
if v != self.version + 1 {
return Err(Error::Message("Invalid new layout version".into()));
}
}
}
self.roles.merge(&self.staging);
self.roles.retain(|(_, _, v)| v.0.is_some());
if !self.calculate_partition_assignation() {
return Err(Error::Message("Could not calculate new assignation of partitions to nodes. This can happen if there are less nodes than the desired number of copies of your data (see the replication_mode configuration parameter).".into()));
}
self.staging.clear();
self.staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]);
self.version += 1;
Ok(self)
}
pub fn revert_staged_changes(mut self, version: Option<u64>) -> Result<Self, Error> {
match version {
None => {
let error = r#"
Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout.
To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes.
"#;
return Err(Error::Message(error.into()));
}
Some(v) => {
if v != self.version + 1 {
return Err(Error::Message("Invalid new layout version".into()));
}
}
}
self.staging.clear();
self.staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]);
self.version += 1;
Ok(self)
}
pub fn node_ids(&self) -> &[Uuid] {
&self.node_id_vec[..]
}
pub fn num_nodes(&self) -> usize {
self.node_id_vec.len()
}
pub fn node_role(&self, node: &Uuid) -> Option<&NodeRole> {
match self.roles.get(node) {
Some(NodeRoleV(Some(v))) => Some(v),
_ => None,
}
}
pub fn check(&self) -> bool {
let staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]);
if staging_hash != self.staging_hash {
return false;
}
let mut expected_nodes = self
.roles
.items()
.iter()
.filter(|(_, _, v)| v.0.is_some())
.map(|(id, _, _)| *id)
.collect::<Vec<_>>();
expected_nodes.sort();
let mut node_id_vec = self.node_id_vec.clone();
node_id_vec.sort();
if expected_nodes != node_id_vec {
return false;
}
if self.ring_assignation_data.len() != (1 << PARTITION_BITS) * self.replication_factor {
return false;
}
for x in self.ring_assignation_data.iter() {
if *x as usize >= self.node_id_vec.len() {
return false;
}
let node = self.node_id_vec[*x as usize];
match self.roles.get(&node) {
Some(NodeRoleV(Some(x))) if x.capacity.is_some() => (),
_ => return false,
}
}
true
}
pub fn calculate_partition_assignation(&mut self) -> bool {
let (configured_nodes, zones) = self.configured_nodes_and_zones();
let n_zones = zones.len();
println!("Calculating updated partition assignation, this may take some time...");
println!();
let old_partitions = self.parse_assignation_data();
let mut partitions = old_partitions
.iter()
.map(|old_part| {
let mut new_part = PartitionAss::new();
for node in old_part.nodes.iter() {
if let Some(role) = node.1 {
if role.capacity.is_some() {
new_part.add(None, n_zones, node.0, role);
}
}
}
new_part
})
.collect::<Vec<_>>();
match self.initial_partition_assignation() {
Some(initial_partitions) => {
for (part, ipart) in partitions.iter_mut().zip(initial_partitions.iter()) {
for _ in 0..2 {
for (id, info) in ipart.nodes.iter() {
if part.nodes.len() < self.replication_factor {
part.add(None, n_zones, id, info.unwrap());
}
}
}
assert!(part.nodes.len() == self.replication_factor);
}
}
None => {
return false;
}
}
let total_capacity = configured_nodes
.iter()
.map(|(_, info)| info.capacity.unwrap_or(0))
.sum::<u32>() as usize;
let total_partitions = self.replication_factor * (1 << PARTITION_BITS);
let target_partitions_per_node = configured_nodes
.iter()
.map(|(id, info)| {
(
*id,
info.capacity.unwrap_or(0) as usize * total_partitions / total_capacity,
)
})
.collect::<HashMap<&Uuid, usize>>();
let mut partitions_per_node = self.partitions_per_node(&partitions[..]);
println!("Target number of partitions per node:");
for (node, npart) in target_partitions_per_node.iter() {
println!("{:?}\t{}", node, npart);
}
println!();
loop {
let mut option = None;
for (i, part) in partitions.iter_mut().enumerate() {
for (irm, (idrm, _)) in part.nodes.iter().enumerate() {
let errratio = |node, parts| {
let tgt = *target_partitions_per_node.get(node).unwrap() as f32;
(parts - tgt) / tgt
};
let square = |x| x * x;
let partsrm = partitions_per_node.get(*idrm).cloned().unwrap_or(0) as f32;
for (idadd, infoadd) in configured_nodes.iter() {
if idadd == idrm || infoadd.capacity.is_none() {
continue;
}
let partsadd = partitions_per_node.get(*idadd).cloned().unwrap_or(0) as f32;
let oldcost = square(errratio(*idrm, partsrm) - errratio(*idadd, partsadd));
let newcost =
square(errratio(*idrm, partsrm - 1.) - errratio(*idadd, partsadd + 1.));
if newcost >= oldcost {
continue;
}
let gain = oldcost - newcost;
let mut newpart = part.clone();
newpart.nodes.remove(irm);
if !newpart.add(None, n_zones, idadd, infoadd) {
continue;
}
assert!(newpart.nodes.len() == self.replication_factor);
if !old_partitions[i]
.is_valid_transition_to(&newpart, self.replication_factor)
{
continue;
}
if option
.as_ref()
.map(|(old_gain, _, _, _, _)| gain > *old_gain)
.unwrap_or(true)
{
option = Some((gain, i, idadd, idrm, newpart));
}
}
}
}
if let Some((_gain, i, idadd, idrm, newpart)) = option {
*partitions_per_node.entry(idadd).or_insert(0) += 1;
*partitions_per_node.get_mut(idrm).unwrap() -= 1;
partitions[i] = newpart;
} else {
break;
}
}
assert!(partitions.len() == (1 << PARTITION_BITS));
assert!(partitions
.iter()
.all(|p| p.nodes.len() == self.replication_factor));
let new_partitions_per_node = self.partitions_per_node(&partitions[..]);
assert!(new_partitions_per_node == partitions_per_node);
println!("New number of partitions per node:");
for (node, npart) in partitions_per_node.iter() {
let tgt = *target_partitions_per_node.get(node).unwrap();
let pct = 100f32 * (*npart as f32) / (tgt as f32);
println!("{:?}\t{}\t({}% of {})", node, npart, pct as i32, tgt);
}
println!();
let mut diffcount = HashMap::new();
for (oldpart, newpart) in old_partitions.iter().zip(partitions.iter()) {
let nminus = oldpart.txtplus(newpart);
let nplus = newpart.txtplus(oldpart);
if nminus != "[...]" || nplus != "[...]" {
let tup = (nminus, nplus);
*diffcount.entry(tup).or_insert(0) += 1;
}
}
if diffcount.is_empty() {
println!("No data will be moved between nodes.");
} else {
let mut diffcount = diffcount.into_iter().collect::<Vec<_>>();
diffcount.sort();
println!("Number of partitions that move:");
for ((nminus, nplus), npart) in diffcount {
println!("\t{}\t{} -> {}", npart, nminus, nplus);
}
}
println!();
let (nodes, assignation_data) =
self.compute_assignation_data(&configured_nodes[..], &partitions[..]);
self.node_id_vec = nodes;
self.ring_assignation_data = assignation_data;
true
}
fn initial_partition_assignation(&self) -> Option<Vec<PartitionAss<'_>>> {
let (configured_nodes, zones) = self.configured_nodes_and_zones();
let n_zones = zones.len();
let partitions_idx = (0usize..(1usize << PARTITION_BITS)).collect::<Vec<_>>();
let mut partitions: Vec<PartitionAss> = partitions_idx
.iter()
.map(|_i| PartitionAss::new())
.collect::<Vec<_>>();
let mut queues = configured_nodes
.iter()
.filter(|(_id, info)| info.capacity.is_some())
.map(|(node_id, node_info)| {
let mut parts = partitions_idx
.iter()
.map(|i| {
let part_data =
[&u16::to_be_bytes(*i as u16)[..], node_id.as_slice()].concat();
(*i, fasthash(&part_data[..]))
})
.collect::<Vec<_>>();
parts.sort_by_key(|(_i, h)| *h);
let parts_i = parts.iter().map(|(i, _h)| *i).collect::<Vec<_>>();
(node_id, node_info, parts_i, 0)
})
.collect::<Vec<_>>();
let max_capacity = configured_nodes
.iter()
.filter_map(|(_, node_info)| node_info.capacity)
.fold(0, std::cmp::max);
for rep in 0..self.replication_factor {
queues.sort_by_key(|(ni, _np, _q, _p)| {
let queue_data = [&u16::to_be_bytes(rep as u16)[..], ni.as_slice()].concat();
fasthash(&queue_data[..])
});
for (_, _, _, pos) in queues.iter_mut() {
*pos = 0;
}
let mut remaining = partitions_idx.len();
while remaining > 0 {
let remaining0 = remaining;
for i_round in 0..max_capacity {
for (node_id, node_info, q, pos) in queues.iter_mut() {
if i_round >= node_info.capacity.unwrap() {
continue;
}
for (pos2, &qv) in q.iter().enumerate().skip(*pos) {
if partitions[qv].add(Some(rep + 1), n_zones, node_id, node_info) {
remaining -= 1;
*pos = pos2 + 1;
break;
}
}
}
}
if remaining == remaining0 {
return None;
}
}
}
Some(partitions)
}
fn configured_nodes_and_zones(&self) -> (Vec<(&Uuid, &NodeRole)>, HashSet<&str>) {
let configured_nodes = self
.roles
.items()
.iter()
.filter(|(_id, _, info)| info.0.is_some())
.map(|(id, _, info)| (id, info.0.as_ref().unwrap()))
.collect::<Vec<(&Uuid, &NodeRole)>>();
let zones = configured_nodes
.iter()
.filter(|(_id, info)| info.capacity.is_some())
.map(|(_id, info)| info.zone.as_str())
.collect::<HashSet<&str>>();
(configured_nodes, zones)
}
fn compute_assignation_data<'a>(
&self,
configured_nodes: &[(&'a Uuid, &'a NodeRole)],
partitions: &[PartitionAss<'a>],
) -> (Vec<Uuid>, Vec<CompactNodeType>) {
assert!(partitions.len() == (1 << PARTITION_BITS));
let mut nodes = configured_nodes
.iter()
.filter(|(_id, info)| info.capacity.is_some())
.map(|(id, _)| **id)
.collect::<Vec<_>>();
let nodes_rev = nodes
.iter()
.enumerate()
.map(|(i, id)| (*id, i as CompactNodeType))
.collect::<HashMap<Uuid, CompactNodeType>>();
let mut assignation_data = vec![];
for partition in partitions.iter() {
assert!(partition.nodes.len() == self.replication_factor);
for (id, _) in partition.nodes.iter() {
assignation_data.push(*nodes_rev.get(id).unwrap());
}
}
nodes.extend(
configured_nodes
.iter()
.filter(|(_id, info)| info.capacity.is_none())
.map(|(id, _)| **id),
);
(nodes, assignation_data)
}
fn parse_assignation_data(&self) -> Vec<PartitionAss<'_>> {
if self.ring_assignation_data.len() == self.replication_factor * (1 << PARTITION_BITS) {
let mut partitions = vec![];
for i in 0..(1 << PARTITION_BITS) {
let mut part = PartitionAss::new();
for node_i in self.ring_assignation_data
[i * self.replication_factor..(i + 1) * self.replication_factor]
.iter()
{
let node_id = &self.node_id_vec[*node_i as usize];
if let Some(NodeRoleV(Some(info))) = self.roles.get(node_id) {
part.nodes.push((node_id, Some(info)));
} else {
part.nodes.push((node_id, None));
}
}
partitions.push(part);
}
partitions
} else {
(0..(1 << PARTITION_BITS))
.map(|_| PartitionAss::new())
.collect()
}
}
fn partitions_per_node<'a>(&self, partitions: &[PartitionAss<'a>]) -> HashMap<&'a Uuid, usize> {
let mut partitions_per_node = HashMap::<&Uuid, usize>::new();
for p in partitions.iter() {
for (id, _) in p.nodes.iter() {
*partitions_per_node.entry(*id).or_insert(0) += 1;
}
}
partitions_per_node
}
}
#[derive(Clone)]
struct PartitionAss<'a> {
nodes: Vec<(&'a Uuid, Option<&'a NodeRole>)>,
}
impl<'a> PartitionAss<'a> {
fn new() -> Self {
Self { nodes: Vec::new() }
}
fn nplus(&self, other: &PartitionAss<'a>) -> usize {
self.nodes
.iter()
.filter(|x| !other.nodes.contains(x))
.count()
}
fn txtplus(&self, other: &PartitionAss<'a>) -> String {
let mut nodes = self
.nodes
.iter()
.filter(|x| !other.nodes.contains(x))
.map(|x| format!("{:?}", x.0))
.collect::<Vec<_>>();
nodes.sort();
if self.nodes.iter().any(|x| other.nodes.contains(x)) {
nodes.push("...".into());
}
format!("[{}]", nodes.join(" "))
}
fn is_valid_transition_to(&self, other: &PartitionAss<'a>, replication_factor: usize) -> bool {
let min_keep_nodes_per_part = (replication_factor + 1) / 2;
let n_removed = self.nplus(other);
if self.nodes.len() <= min_keep_nodes_per_part {
n_removed == 0
} else {
n_removed <= self.nodes.len() - min_keep_nodes_per_part
}
}
fn add(
&mut self,
target_len: Option<usize>,
n_zones: usize,
node: &'a Uuid,
role: &'a NodeRole,
) -> bool {
if let Some(tl) = target_len {
if self.nodes.len() != tl - 1 {
return false;
}
}
let p_zns = self
.nodes
.iter()
.map(|(_id, info)| info.unwrap().zone.as_str())
.collect::<HashSet<&str>>();
if (p_zns.len() < n_zones && !p_zns.contains(&role.zone.as_str()))
|| (p_zns.len() == n_zones && !self.nodes.iter().any(|(id, _)| *id == node))
{
self.nodes.push((node, Some(role)));
true
} else {
false
}
}
}