use std::cmp::Ordering;
use std::collections::HashMap;
use std::collections::HashSet;
use std::fmt;
use bytesize::ByteSize;
use itertools::Itertools;
use garage_util::crdt::{AutoCrdt, Crdt, Lww, LwwMap};
use garage_util::data::*;
use garage_util::encode::nonversioned_encode;
use garage_util::error::*;
use crate::graph_algo::*;
use crate::ring::*;
use std::convert::TryInto;
const NB_PARTITIONS: usize = 1usize << PARTITION_BITS;
type Message = Vec<String>;
mod v08 {
use crate::ring::CompactNodeType;
use garage_util::crdt::LwwMap;
use garage_util::data::{Hash, Uuid};
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ClusterLayout {
pub version: u64,
pub replication_factor: usize,
pub roles: LwwMap<Uuid, NodeRoleV>,
pub node_id_vec: Vec<Uuid>,
#[serde(with = "serde_bytes")]
pub ring_assignation_data: Vec<CompactNodeType>,
pub staging: LwwMap<Uuid, NodeRoleV>,
pub staging_hash: Hash,
}
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct NodeRoleV(pub Option<NodeRole>);
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct NodeRole {
pub zone: String,
pub capacity: Option<u64>,
pub tags: Vec<String>,
}
impl garage_util::migrate::InitialFormat for ClusterLayout {}
}
mod v09 {
use super::v08;
use crate::ring::CompactNodeType;
use garage_util::crdt::{Lww, LwwMap};
use garage_util::data::{Hash, Uuid};
use serde::{Deserialize, Serialize};
pub use v08::{NodeRole, NodeRoleV};
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ClusterLayout {
pub version: u64,
pub replication_factor: usize,
pub partition_size: u64,
pub parameters: LayoutParameters,
pub roles: LwwMap<Uuid, NodeRoleV>,
pub node_id_vec: Vec<Uuid>,
#[serde(with = "serde_bytes")]
pub ring_assignment_data: Vec<CompactNodeType>,
pub staging_parameters: Lww<LayoutParameters>,
pub staging_roles: LwwMap<Uuid, NodeRoleV>,
pub staging_hash: Hash,
}
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug, Serialize, Deserialize)]
pub struct LayoutParameters {
pub zone_redundancy: ZoneRedundancy,
}
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug, Serialize, Deserialize)]
pub enum ZoneRedundancy {
AtLeast(usize),
Maximum,
}
impl garage_util::migrate::Migrate for ClusterLayout {
const VERSION_MARKER: &'static [u8] = b"G09layout";
type Previous = v08::ClusterLayout;
fn migrate(previous: Self::Previous) -> Self {
use itertools::Itertools;
let cap_mul = 1024 * 1024 * 1024;
let roles = multiply_all_capacities(previous.roles, cap_mul);
let staging_roles = multiply_all_capacities(previous.staging, cap_mul);
let node_id_vec = previous.node_id_vec;
let mut tmp = previous.ring_assignation_data.clone();
tmp.sort();
let partition_size = tmp
.into_iter()
.dedup_with_count()
.map(|(npart, node)| {
roles
.get(&node_id_vec[node as usize])
.and_then(|p| p.0.as_ref().and_then(|r| r.capacity))
.unwrap_or(0) / npart as u64
})
.min()
.unwrap_or(0);
let parameters = LayoutParameters {
zone_redundancy: ZoneRedundancy::Maximum,
};
let mut res = Self {
version: previous.version,
replication_factor: previous.replication_factor,
partition_size,
parameters,
roles,
node_id_vec,
ring_assignment_data: previous.ring_assignation_data,
staging_parameters: Lww::new(parameters),
staging_roles,
staging_hash: [0u8; 32].into(),
};
res.staging_hash = res.calculate_staging_hash();
res
}
}
fn multiply_all_capacities(
old_roles: LwwMap<Uuid, NodeRoleV>,
mul: u64,
) -> LwwMap<Uuid, NodeRoleV> {
let mut new_roles = LwwMap::new();
for (node, ts, role) in old_roles.items() {
let mut role = role.clone();
if let NodeRoleV(Some(NodeRole {
capacity: Some(ref mut cap),
..
})) = role
{
*cap *= mul;
}
new_roles.merge_raw(node, *ts, &role);
}
new_roles
}
}
pub use v09::*;
impl AutoCrdt for LayoutParameters {
const WARN_IF_DIFFERENT: bool = true;
}
impl AutoCrdt for NodeRoleV {
const WARN_IF_DIFFERENT: bool = true;
}
impl NodeRole {
pub fn capacity_string(&self) -> String {
match self.capacity {
Some(c) => ByteSize::b(c).to_string_as(false),
None => "gateway".to_string(),
}
}
pub fn tags_string(&self) -> String {
self.tags.join(",")
}
}
impl fmt::Display for ZoneRedundancy {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ZoneRedundancy::Maximum => write!(f, "maximum"),
ZoneRedundancy::AtLeast(x) => write!(f, "{}", x),
}
}
}
impl core::str::FromStr for ZoneRedundancy {
type Err = &'static str;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"none" | "max" | "maximum" => Ok(ZoneRedundancy::Maximum),
x => {
let v = x
.parse::<usize>()
.map_err(|_| "zone redundancy must be 'none'/'max' or an integer")?;
Ok(ZoneRedundancy::AtLeast(v))
}
}
}
}
impl ClusterLayout {
pub fn new(replication_factor: usize) -> Self {
let parameters = LayoutParameters {
zone_redundancy: ZoneRedundancy::Maximum,
};
let staging_parameters = Lww::<LayoutParameters>::new(parameters);
let empty_lwwmap = LwwMap::new();
let mut ret = ClusterLayout {
version: 0,
replication_factor,
partition_size: 0,
roles: LwwMap::new(),
node_id_vec: Vec::new(),
ring_assignment_data: Vec::new(),
parameters,
staging_parameters,
staging_roles: empty_lwwmap,
staging_hash: [0u8; 32].into(),
};
ret.staging_hash = ret.calculate_staging_hash();
ret
}
fn calculate_staging_hash(&self) -> Hash {
let hashed_tuple = (&self.staging_roles, &self.staging_parameters);
blake2sum(&nonversioned_encode(&hashed_tuple).unwrap()[..])
}
pub fn merge(&mut self, other: &ClusterLayout) -> bool {
match other.version.cmp(&self.version) {
Ordering::Greater => {
*self = other.clone();
true
}
Ordering::Equal => {
self.staging_parameters.merge(&other.staging_parameters);
self.staging_roles.merge(&other.staging_roles);
let new_staging_hash = self.calculate_staging_hash();
let changed = new_staging_hash != self.staging_hash;
self.staging_hash = new_staging_hash;
changed
}
Ordering::Less => false,
}
}
pub fn apply_staged_changes(mut self, version: Option<u64>) -> Result<(Self, Message), Error> {
match version {
None => {
let error = r#"
Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout.
To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes.
"#;
return Err(Error::Message(error.into()));
}
Some(v) => {
if v != self.version + 1 {
return Err(Error::Message("Invalid new layout version".into()));
}
}
}
self.roles.merge(&self.staging_roles);
self.roles.retain(|(_, _, v)| v.0.is_some());
self.parameters = *self.staging_parameters.get();
self.staging_roles.clear();
self.staging_hash = self.calculate_staging_hash();
let msg = self.calculate_partition_assignment()?;
self.version += 1;
Ok((self, msg))
}
pub fn revert_staged_changes(mut self, version: Option<u64>) -> Result<Self, Error> {
match version {
None => {
let error = r#"
Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout.
To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes.
"#;
return Err(Error::Message(error.into()));
}
Some(v) => {
if v != self.version + 1 {
return Err(Error::Message("Invalid new layout version".into()));
}
}
}
self.staging_roles.clear();
self.staging_parameters.update(self.parameters);
self.staging_hash = self.calculate_staging_hash();
self.version += 1;
Ok(self)
}
pub fn node_ids(&self) -> &[Uuid] {
&self.node_id_vec[..]
}
pub fn num_nodes(&self) -> usize {
self.node_id_vec.len()
}
pub fn node_role(&self, node: &Uuid) -> Option<&NodeRole> {
match self.roles.get(node) {
Some(NodeRoleV(Some(v))) => Some(v),
_ => None,
}
}
fn nongateway_nodes(&self) -> Vec<Uuid> {
let mut result = Vec::<Uuid>::new();
for uuid in self.node_id_vec.iter() {
match self.node_role(uuid) {
Some(role) if role.capacity.is_some() => result.push(*uuid),
_ => (),
}
}
result
}
fn get_node_zone(&self, uuid: &Uuid) -> Result<String, Error> {
match self.node_role(uuid) {
Some(role) => Ok(role.zone.clone()),
_ => Err(Error::Message(
"The Uuid does not correspond to a node present in the cluster.".into(),
)),
}
}
pub fn get_node_capacity(&self, uuid: &Uuid) -> Result<u64, Error> {
match self.node_role(uuid) {
Some(NodeRole {
capacity: Some(cap),
zone: _,
tags: _,
}) => Ok(*cap),
_ => Err(Error::Message(
"The Uuid does not correspond to a node present in the \
cluster or this node does not have a positive capacity."
.into(),
)),
}
}
pub fn get_node_usage(&self, uuid: &Uuid) -> Result<usize, Error> {
for (i, id) in self.node_id_vec.iter().enumerate() {
if id == uuid {
let mut count = 0;
for nod in self.ring_assignment_data.iter() {
if i as u8 == *nod {
count += 1
}
}
return Ok(count);
}
}
Err(Error::Message(
"The Uuid does not correspond to a node present in the \
cluster or this node does not have a positive capacity."
.into(),
))
}
fn get_total_capacity(&self) -> Result<u64, Error> {
let mut total_capacity = 0;
for uuid in self.nongateway_nodes().iter() {
total_capacity += self.get_node_capacity(uuid)?;
}
Ok(total_capacity)
}
fn effective_zone_redundancy(&self) -> usize {
match self.parameters.zone_redundancy {
ZoneRedundancy::AtLeast(v) => v,
ZoneRedundancy::Maximum => {
let n_zones = self
.roles
.items()
.iter()
.filter_map(|(_, _, role)| role.0.as_ref().map(|x| x.zone.as_str()))
.collect::<HashSet<&str>>()
.len();
std::cmp::min(n_zones, self.replication_factor)
}
}
}
pub fn check(&self) -> Result<(), String> {
let staging_hash = self.calculate_staging_hash();
if staging_hash != self.staging_hash {
return Err("staging_hash is incorrect".into());
}
let mut expected_nodes = self
.roles
.items()
.iter()
.filter(|(_, _, v)| v.0.is_some())
.map(|(id, _, _)| *id)
.collect::<Vec<_>>();
expected_nodes.sort();
let mut node_id_vec = self.node_id_vec.clone();
node_id_vec.sort();
if expected_nodes != node_id_vec {
return Err(format!("node_id_vec does not contain the correct set of nodes\nnode_id_vec: {:?}\nexpected: {:?}", node_id_vec, expected_nodes));
}
let expected_assignment_data_len = (1 << PARTITION_BITS) * self.replication_factor;
if self.ring_assignment_data.len() != expected_assignment_data_len {
return Err(format!(
"ring_assignment_data has incorrect length {} instead of {}",
self.ring_assignment_data.len(),
expected_assignment_data_len
));
}
for x in self.ring_assignment_data.iter() {
if *x as usize >= self.node_id_vec.len() {
return Err(format!(
"ring_assignment_data contains invalid node id {}",
*x
));
}
let node = self.node_id_vec[*x as usize];
match self.roles.get(&node) {
Some(NodeRoleV(Some(x))) if x.capacity.is_some() => (),
_ => return Err("ring_assignment_data contains id of a gateway node".into()),
}
}
let zone_redundancy = self.effective_zone_redundancy();
let rf = self.replication_factor;
for p in 0..(1 << PARTITION_BITS) {
let nodes_of_p = self.ring_assignment_data[rf * p..rf * (p + 1)].to_vec();
if nodes_of_p.iter().unique().count() != rf {
return Err(format!("partition does not contain {} unique node ids", rf));
}
let zones_of_p = nodes_of_p
.iter()
.map(|n| {
self.get_node_zone(&self.node_id_vec[*n as usize])
.expect("Zone not found.")
})
.collect::<Vec<_>>();
if zones_of_p.iter().unique().count() < zone_redundancy {
return Err(format!(
"nodes of partition are in less than {} distinct zones",
zone_redundancy
));
}
}
let mut node_usage = vec![0; MAX_NODE_NUMBER];
for n in self.ring_assignment_data.iter() {
node_usage[*n as usize] += 1;
}
for (n, usage) in node_usage.iter().enumerate() {
if *usage > 0 {
let uuid = self.node_id_vec[n];
let partusage = usage * self.partition_size;
let nodecap = self.get_node_capacity(&uuid).unwrap();
if partusage > nodecap {
return Err(format!(
"node usage ({}) is bigger than node capacity ({})",
usage * self.partition_size,
nodecap
));
}
}
}
let cl2 = self.clone();
let (_, zone_to_id) = cl2.generate_nongateway_zone_ids().unwrap();
match cl2.compute_optimal_partition_size(&zone_to_id, zone_redundancy) {
Ok(s) if s != self.partition_size => {
return Err(format!(
"partition_size ({}) is different than optimal value ({})",
self.partition_size, s
))
}
Err(e) => return Err(format!("could not calculate optimal partition size: {}", e)),
_ => (),
}
Ok(())
}
}
impl ClusterLayout {
fn calculate_partition_assignment(&mut self) -> Result<Message, Error> {
let old_assignment_opt = self.update_node_id_vec()?;
let zone_redundancy = self.effective_zone_redundancy();
let mut msg = Message::new();
msg.push("==== COMPUTATION OF A NEW PARTITION ASSIGNATION ====".into());
msg.push("".into());
msg.push(format!(
"Partitions are \
replicated {} times on at least {} distinct zones.",
self.replication_factor, zone_redundancy
));
let (id_to_zone, zone_to_id) = self.generate_nongateway_zone_ids()?;
let nb_nongateway_nodes = self.nongateway_nodes().len();
if nb_nongateway_nodes < self.replication_factor {
return Err(Error::Message(format!(
"The number of nodes with positive \
capacity ({}) is smaller than the replication factor ({}).",
nb_nongateway_nodes, self.replication_factor
)));
}
if id_to_zone.len() < zone_redundancy {
return Err(Error::Message(format!(
"The number of zones with non-gateway \
nodes ({}) is smaller than the redundancy parameter ({})",
id_to_zone.len(),
zone_redundancy
)));
}
let partition_size = self.compute_optimal_partition_size(&zone_to_id, zone_redundancy)?;
msg.push("".into());
if old_assignment_opt.is_some() {
msg.push(format!(
"Optimal partition size: {} ({} in previous layout)",
ByteSize::b(partition_size).to_string_as(false),
ByteSize::b(self.partition_size).to_string_as(false)
));
} else {
msg.push(format!(
"Optimal partition size: {}",
ByteSize::b(partition_size).to_string_as(false)
));
}
self.partition_size = partition_size;
if partition_size < 100 {
msg.push(
"WARNING: The partition size is low (< 100), make sure the capacities of your nodes are correct and are of at least a few MB"
.into(),
);
}
let mut gflow =
self.compute_candidate_assignment(&zone_to_id, &old_assignment_opt, zone_redundancy)?;
if let Some(assoc) = &old_assignment_opt {
self.minimize_rebalance_load(&mut gflow, &zone_to_id, assoc)?;
}
msg.extend(self.output_stat(&gflow, &old_assignment_opt, &zone_to_id, &id_to_zone)?);
self.update_ring_from_flow(id_to_zone.len(), &gflow)?;
if let Err(e) = self.check() {
return Err(Error::Message(
format!("Layout check returned an error: {}\nOriginal result of computation: <<<<\n{}\n>>>>", e, msg.join("\n"))
));
}
Ok(msg)
}
fn update_node_id_vec(&mut self) -> Result<Option<Vec<Vec<usize>>>, Error> {
let new_non_gateway_nodes: Vec<Uuid> = self
.roles
.items()
.iter()
.filter(|(_, _, v)| matches!(&v.0, Some(r) if r.capacity.is_some()))
.map(|(k, _, _)| *k)
.collect();
if new_non_gateway_nodes.len() > MAX_NODE_NUMBER {
return Err(Error::Message(format!(
"There are more than {} non-gateway nodes in the new \
layout. This is not allowed.",
MAX_NODE_NUMBER
)));
}
let new_gateway_nodes: Vec<Uuid> = self
.roles
.items()
.iter()
.filter(|(_, _, v)| matches!(v, NodeRoleV(Some(r)) if r.capacity.is_none()))
.map(|(k, _, _)| *k)
.collect();
let mut new_node_id_vec = Vec::<Uuid>::new();
new_node_id_vec.extend(new_non_gateway_nodes);
new_node_id_vec.extend(new_gateway_nodes);
let old_node_id_vec = self.node_id_vec.clone();
self.node_id_vec = new_node_id_vec.clone();
if self.ring_assignment_data.is_empty() {
return Ok(None);
}
if self.ring_assignment_data.len() != NB_PARTITIONS * self.replication_factor {
return Err(Error::Message(
"The old assignment does not have a size corresponding to \
the old replication factor or the number of partitions."
.into(),
));
}
let mut uuid_to_new_id = HashMap::<Uuid, usize>::new();
for (i, uuid) in new_node_id_vec.iter().enumerate() {
uuid_to_new_id.insert(*uuid, i);
}
let mut old_assignment = vec![Vec::<usize>::new(); NB_PARTITIONS];
let rf = self.replication_factor;
for (p, old_assign_p) in old_assignment.iter_mut().enumerate() {
for old_id in &self.ring_assignment_data[p * rf..(p + 1) * rf] {
let uuid = old_node_id_vec[*old_id as usize];
if uuid_to_new_id.contains_key(&uuid) {
old_assign_p.push(uuid_to_new_id[&uuid]);
}
}
}
self.ring_assignment_data = Vec::<CompactNodeType>::new();
Ok(Some(old_assignment))
}
fn generate_nongateway_zone_ids(&self) -> Result<(Vec<String>, HashMap<String, usize>), Error> {
let mut id_to_zone = Vec::<String>::new();
let mut zone_to_id = HashMap::<String, usize>::new();
for uuid in self.nongateway_nodes().iter() {
let r = self.node_role(uuid).unwrap();
if !zone_to_id.contains_key(&r.zone) && r.capacity.is_some() {
zone_to_id.insert(r.zone.clone(), id_to_zone.len());
id_to_zone.push(r.zone.clone());
}
}
Ok((id_to_zone, zone_to_id))
}
fn compute_optimal_partition_size(
&self,
zone_to_id: &HashMap<String, usize>,
zone_redundancy: usize,
) -> Result<u64, Error> {
let empty_set = HashSet::<(usize, usize)>::new();
let mut g = self.generate_flow_graph(1, zone_to_id, &empty_set, zone_redundancy)?;
g.compute_maximal_flow()?;
if g.get_flow_value()? < (NB_PARTITIONS * self.replication_factor) as i64 {
return Err(Error::Message(
"The storage capacity of he cluster is to small. It is \
impossible to store partitions of size 1."
.into(),
));
}
let mut s_down = 1;
let mut s_up = self.get_total_capacity()?;
while s_down + 1 < s_up {
g = self.generate_flow_graph(
(s_down + s_up) / 2,
zone_to_id,
&empty_set,
zone_redundancy,
)?;
g.compute_maximal_flow()?;
if g.get_flow_value()? < (NB_PARTITIONS * self.replication_factor) as i64 {
s_up = (s_down + s_up) / 2;
} else {
s_down = (s_down + s_up) / 2;
}
}
Ok(s_down)
}
fn generate_graph_vertices(nb_zones: usize, nb_nodes: usize) -> Vec<Vertex> {
let mut vertices = vec![Vertex::Source, Vertex::Sink];
for p in 0..NB_PARTITIONS {
vertices.push(Vertex::Pup(p));
vertices.push(Vertex::Pdown(p));
for z in 0..nb_zones {
vertices.push(Vertex::PZ(p, z));
}
}
for n in 0..nb_nodes {
vertices.push(Vertex::N(n));
}
vertices
}
fn generate_flow_graph(
&self,
partition_size: u64,
zone_to_id: &HashMap<String, usize>,
exclude_assoc: &HashSet<(usize, usize)>,
zone_redundancy: usize,
) -> Result<Graph<FlowEdge>, Error> {
let vertices =
ClusterLayout::generate_graph_vertices(zone_to_id.len(), self.nongateway_nodes().len());
let mut g = Graph::<FlowEdge>::new(&vertices);
let nb_zones = zone_to_id.len();
for p in 0..NB_PARTITIONS {
g.add_edge(Vertex::Source, Vertex::Pup(p), zone_redundancy as u64)?;
g.add_edge(
Vertex::Source,
Vertex::Pdown(p),
(self.replication_factor - zone_redundancy) as u64,
)?;
for z in 0..nb_zones {
g.add_edge(Vertex::Pup(p), Vertex::PZ(p, z), 1)?;
g.add_edge(
Vertex::Pdown(p),
Vertex::PZ(p, z),
self.replication_factor as u64,
)?;
}
}
for n in 0..self.nongateway_nodes().len() {
let node_capacity = self.get_node_capacity(&self.node_id_vec[n])?;
let node_zone = zone_to_id[&self.get_node_zone(&self.node_id_vec[n])?];
g.add_edge(Vertex::N(n), Vertex::Sink, node_capacity / partition_size)?;
for p in 0..NB_PARTITIONS {
if !exclude_assoc.contains(&(p, n)) {
g.add_edge(Vertex::PZ(p, node_zone), Vertex::N(n), 1)?;
}
}
}
Ok(g)
}
fn compute_candidate_assignment(
&self,
zone_to_id: &HashMap<String, usize>,
prev_assign_opt: &Option<Vec<Vec<usize>>>,
zone_redundancy: usize,
) -> Result<Graph<FlowEdge>, Error> {
let mut exclude_edge = HashSet::<(usize, usize)>::new();
if let Some(prev_assign) = prev_assign_opt {
let nb_nodes = self.nongateway_nodes().len();
for (p, prev_assign_p) in prev_assign.iter().enumerate() {
for n in 0..nb_nodes {
exclude_edge.insert((p, n));
}
for n in prev_assign_p.iter() {
exclude_edge.remove(&(p, *n));
}
}
}
let mut g = self.generate_flow_graph(
self.partition_size,
zone_to_id,
&exclude_edge,
zone_redundancy,
)?;
g.compute_maximal_flow()?;
for (p, n) in exclude_edge.iter() {
let node_zone = zone_to_id[&self.get_node_zone(&self.node_id_vec[*n])?];
g.add_edge(Vertex::PZ(*p, node_zone), Vertex::N(*n), 1)?;
}
g.compute_maximal_flow()?;
Ok(g)
}
fn minimize_rebalance_load(
&self,
gflow: &mut Graph<FlowEdge>,
zone_to_id: &HashMap<String, usize>,
prev_assign: &[Vec<usize>],
) -> Result<(), Error> {
let mut cost = CostFunction::new();
for (p, assoc_p) in prev_assign.iter().enumerate() {
for n in assoc_p.iter() {
let node_zone = zone_to_id[&self.get_node_zone(&self.node_id_vec[*n])?];
cost.insert((Vertex::PZ(p, node_zone), Vertex::N(*n)), -1);
}
}
let nb_nodes = self.nongateway_nodes().len();
let path_length = 4 * nb_nodes;
gflow.optimize_flow_with_cost(&cost, path_length)?;
Ok(())
}
fn update_ring_from_flow(
&mut self,
nb_zones: usize,
gflow: &Graph<FlowEdge>,
) -> Result<(), Error> {
self.ring_assignment_data = Vec::<CompactNodeType>::new();
for p in 0..NB_PARTITIONS {
for z in 0..nb_zones {
let assoc_vertex = gflow.get_positive_flow_from(Vertex::PZ(p, z))?;
for vertex in assoc_vertex.iter() {
if let Vertex::N(n) = vertex {
self.ring_assignment_data.push((*n).try_into().unwrap());
}
}
}
}
if self.ring_assignment_data.len() != NB_PARTITIONS * self.replication_factor {
return Err(Error::Message(
"Critical Error : the association ring we produced does not \
have the right size."
.into(),
));
}
Ok(())
}
fn output_stat(
&self,
gflow: &Graph<FlowEdge>,
prev_assign_opt: &Option<Vec<Vec<usize>>>,
zone_to_id: &HashMap<String, usize>,
id_to_zone: &[String],
) -> Result<Message, Error> {
let mut msg = Message::new();
let used_cap = self.partition_size * NB_PARTITIONS as u64 * self.replication_factor as u64;
let total_cap = self.get_total_capacity()?;
let percent_cap = 100.0 * (used_cap as f32) / (total_cap as f32);
msg.push(format!(
"Usable capacity / total cluster capacity: {} / {} ({:.1} %)",
ByteSize::b(used_cap).to_string_as(false),
ByteSize::b(total_cap).to_string_as(false),
percent_cap
));
msg.push(format!(
"Effective capacity (replication factor {}): {}",
self.replication_factor,
ByteSize::b(used_cap / self.replication_factor as u64).to_string_as(false)
));
if percent_cap < 80. {
msg.push("".into());
msg.push(
"If the percentage is too low, it might be that the \
cluster topology and redundancy constraints are forcing the use of nodes/zones with small \
storage capacities."
.into(),
);
msg.push(
"You might want to move storage capacity between zones or relax the redundancy constraint."
.into(),
);
msg.push(
"See the detailed statistics below and look for saturated nodes/zones.".into(),
);
}
let storing_nodes = self.nongateway_nodes();
let mut new_partitions = vec![0; storing_nodes.len()];
let mut stored_partitions = vec![0; storing_nodes.len()];
let mut new_partitions_zone = vec![0; id_to_zone.len()];
let mut stored_partitions_zone = vec![0; id_to_zone.len()];
for p in 0..NB_PARTITIONS {
for z in 0..id_to_zone.len() {
let pz_nodes = gflow.get_positive_flow_from(Vertex::PZ(p, z))?;
if !pz_nodes.is_empty() {
stored_partitions_zone[z] += 1;
if let Some(prev_assign) = prev_assign_opt {
let mut old_zones_of_p = Vec::<usize>::new();
for n in prev_assign[p].iter() {
old_zones_of_p
.push(zone_to_id[&self.get_node_zone(&self.node_id_vec[*n])?]);
}
if !old_zones_of_p.contains(&z) {
new_partitions_zone[z] += 1;
}
}
}
for vert in pz_nodes.iter() {
if let Vertex::N(n) = *vert {
stored_partitions[n] += 1;
if let Some(prev_assign) = prev_assign_opt {
if !prev_assign[p].contains(&n) {
new_partitions[n] += 1;
}
}
}
}
}
}
if prev_assign_opt.is_none() {
new_partitions = stored_partitions.clone();
}
msg.push("".into());
if prev_assign_opt.is_some() {
let total_new_partitions: usize = new_partitions.iter().sum();
msg.push(format!(
"A total of {} new copies of partitions need to be \
transferred.",
total_new_partitions
));
msg.push("".into());
}
let mut table = vec![];
for z in 0..id_to_zone.len() {
let mut nodes_of_z = Vec::<usize>::new();
for n in 0..storing_nodes.len() {
if self.get_node_zone(&self.node_id_vec[n])? == id_to_zone[z] {
nodes_of_z.push(n);
}
}
let replicated_partitions: usize =
nodes_of_z.iter().map(|n| stored_partitions[*n]).sum();
table.push(format!(
"{}\tTags\tPartitions\tCapacity\tUsable capacity",
id_to_zone[z]
));
let available_cap_z: u64 = self.partition_size * replicated_partitions as u64;
let mut total_cap_z = 0;
for n in nodes_of_z.iter() {
total_cap_z += self.get_node_capacity(&self.node_id_vec[*n])?;
}
let percent_cap_z = 100.0 * (available_cap_z as f32) / (total_cap_z as f32);
for n in nodes_of_z.iter() {
let available_cap_n = stored_partitions[*n] as u64 * self.partition_size;
let total_cap_n = self.get_node_capacity(&self.node_id_vec[*n])?;
let tags_n = (self.node_role(&self.node_id_vec[*n]).ok_or("<??>"))?.tags_string();
table.push(format!(
" {:?}\t{}\t{} ({} new)\t{}\t{} ({:.1}%)",
self.node_id_vec[*n],
tags_n,
stored_partitions[*n],
new_partitions[*n],
ByteSize::b(total_cap_n).to_string_as(false),
ByteSize::b(available_cap_n).to_string_as(false),
(available_cap_n as f32) / (total_cap_n as f32) * 100.0,
));
}
table.push(format!(
" TOTAL\t\t{} ({} unique)\t{}\t{} ({:.1}%)",
replicated_partitions,
stored_partitions_zone[z],
ByteSize::b(total_cap_z).to_string_as(false),
ByteSize::b(available_cap_z).to_string_as(false),
percent_cap_z
));
table.push("".into());
}
msg.push(format_table::format_table_to_string(table));
Ok(msg)
}
}
#[cfg(test)]
mod tests {
use super::{Error, *};
use std::cmp::min;
fn check_against_naive(cl: &ClusterLayout) -> Result<bool, Error> {
let over_size = cl.partition_size + 1;
let mut zone_token = HashMap::<String, usize>::new();
let (zones, zone_to_id) = cl.generate_nongateway_zone_ids()?;
if zones.is_empty() {
return Ok(false);
}
for z in zones.iter() {
zone_token.insert(z.clone(), 0);
}
for uuid in cl.nongateway_nodes().iter() {
let z = cl.get_node_zone(uuid)?;
let c = cl.get_node_capacity(uuid)?;
zone_token.insert(
z.clone(),
zone_token[&z] + min(NB_PARTITIONS, (c / over_size) as usize),
);
}
let mut id_zone_token = vec![0; zones.len()];
for (z, t) in zone_token.iter() {
id_zone_token[zone_to_id[z]] = *t;
}
let mut nb_token = vec![0; NB_PARTITIONS];
let mut last_zone = vec![zones.len(); NB_PARTITIONS];
let mut curr_zone = 0;
let redundancy = cl.effective_zone_redundancy();
for replic in 0..cl.replication_factor {
for p in 0..NB_PARTITIONS {
while id_zone_token[curr_zone] == 0
|| (last_zone[p] == curr_zone
&& redundancy - nb_token[p] <= cl.replication_factor - replic)
{
curr_zone += 1;
if curr_zone >= zones.len() {
return Ok(true);
}
}
id_zone_token[curr_zone] -= 1;
if last_zone[p] != curr_zone {
nb_token[p] += 1;
last_zone[p] = curr_zone;
}
}
}
return Ok(false);
}
fn show_msg(msg: &Message) {
for s in msg.iter() {
println!("{}", s);
}
}
fn update_layout(
cl: &mut ClusterLayout,
node_id_vec: &Vec<u8>,
node_capacity_vec: &Vec<u64>,
node_zone_vec: &Vec<String>,
zone_redundancy: usize,
) {
for i in 0..node_id_vec.len() {
if let Some(x) = FixedBytes32::try_from(&[i as u8; 32]) {
cl.node_id_vec.push(x);
}
let update = cl.staging_roles.update_mutator(
cl.node_id_vec[i],
NodeRoleV(Some(NodeRole {
zone: (node_zone_vec[i].to_string()),
capacity: (Some(node_capacity_vec[i])),
tags: (vec![]),
})),
);
cl.staging_roles.merge(&update);
}
cl.staging_parameters.update(LayoutParameters {
zone_redundancy: ZoneRedundancy::AtLeast(zone_redundancy),
});
cl.staging_hash = cl.calculate_staging_hash();
}
#[test]
fn test_assignment() {
let mut node_id_vec = vec![1, 2, 3];
let mut node_capacity_vec = vec![4000, 1000, 2000];
let mut node_zone_vec = vec!["A", "B", "C"]
.into_iter()
.map(|x| x.to_string())
.collect();
let mut cl = ClusterLayout::new(3);
update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec, 3);
let v = cl.version;
let (mut cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap();
show_msg(&msg);
assert_eq!(cl.check(), Ok(()));
assert!(matches!(check_against_naive(&cl), Ok(true)));
node_id_vec = vec![1, 2, 3, 4, 5, 6, 7, 8, 9];
node_capacity_vec = vec![4000, 1000, 1000, 3000, 1000, 1000, 2000, 10000, 2000];
node_zone_vec = vec!["A", "B", "C", "C", "C", "B", "G", "H", "I"]
.into_iter()
.map(|x| x.to_string())
.collect();
update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec, 2);
let v = cl.version;
let (mut cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap();
show_msg(&msg);
assert_eq!(cl.check(), Ok(()));
assert!(matches!(check_against_naive(&cl), Ok(true)));
node_capacity_vec = vec![4000, 1000, 2000, 7000, 1000, 1000, 2000, 10000, 2000];
update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec, 3);
let v = cl.version;
let (mut cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap();
show_msg(&msg);
assert_eq!(cl.check(), Ok(()));
assert!(matches!(check_against_naive(&cl), Ok(true)));
node_capacity_vec = vec![
4000000, 4000000, 2000000, 7000000, 1000000, 9000000, 2000000, 10000, 2000000,
];
update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec, 1);
let v = cl.version;
let (cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap();
show_msg(&msg);
assert_eq!(cl.check(), Ok(()));
assert!(matches!(check_against_naive(&cl), Ok(true)));
}
}