use std::collections::HashSet;
use crate::sync_sim::runtime::SimTime;
use crate::sync_sim::types::NodeId;
#[derive(Debug, Clone)]
pub enum PartitionSpec {
Bidirectional { groups: Vec<Vec<NodeId>> },
Directional { blocked: Vec<(NodeId, NodeId)> },
}
impl PartitionSpec {
pub fn split(group_a: Vec<NodeId>, group_b: Vec<NodeId>) -> Self {
Self::Bidirectional {
groups: vec![group_a, group_b],
}
}
pub fn isolate(node: NodeId, others: Vec<NodeId>) -> Self {
Self::Bidirectional {
groups: vec![vec![node], others],
}
}
pub fn block(from: NodeId, to: NodeId) -> Self {
Self::Directional {
blocked: vec![(from, to)],
}
}
pub fn blocks(&self, from: &NodeId, to: &NodeId) -> bool {
match self {
Self::Bidirectional { groups } => {
let from_group = groups.iter().position(|g| g.contains(from));
let to_group = groups.iter().position(|g| g.contains(to));
match (from_group, to_group) {
(Some(fg), Some(tg)) => fg != tg, _ => false, }
}
Self::Directional { blocked } => blocked.iter().any(|(f, t)| f == from && t == to),
}
}
}
#[derive(Debug, Clone)]
struct ActivePartition {
spec: PartitionSpec,
start_time: SimTime,
end_time: Option<SimTime>,
}
impl ActivePartition {
fn is_active_at(&self, now: SimTime) -> bool {
let started = self.start_time <= now;
let not_ended = self.end_time.map_or(true, |end| end > now);
started && not_ended
}
}
#[derive(Debug, Default)]
pub struct PartitionManager {
partitions: Vec<ActivePartition>,
blocked_cache: HashSet<(String, String)>,
cache_time: Option<SimTime>,
}
impl PartitionManager {
pub fn new() -> Self {
Self::default()
}
pub fn add_partition(&mut self, spec: PartitionSpec, start: SimTime, end: Option<SimTime>) {
self.partitions.push(ActivePartition {
spec,
start_time: start,
end_time: end,
});
self.invalidate_cache();
}
pub fn remove_partitions<F>(&mut self, predicate: F)
where
F: Fn(&PartitionSpec) -> bool,
{
self.partitions.retain(|p| !predicate(&p.spec));
self.invalidate_cache();
}
pub fn clear(&mut self) {
self.partitions.clear();
self.invalidate_cache();
}
pub fn is_partitioned(&mut self, from: &NodeId, to: &NodeId, now: SimTime) -> bool {
self.partitions
.retain(|p| p.end_time.map_or(true, |end| end > now));
for partition in &self.partitions {
if partition.is_active_at(now) && partition.spec.blocks(from, to) {
return true;
}
}
false
}
pub fn partition_count_at(&self, now: SimTime) -> usize {
self.partitions
.iter()
.filter(|p| p.is_active_at(now))
.count()
}
pub fn partition_count(&self) -> usize {
self.partitions.len()
}
pub fn has_partitions_at(&self, now: SimTime) -> bool {
self.partitions.iter().any(|p| p.is_active_at(now))
}
pub fn has_partitions(&self) -> bool {
!self.partitions.is_empty()
}
fn invalidate_cache(&mut self) {
self.blocked_cache.clear();
self.cache_time = None;
}
pub fn get_blocked_pairs(&self, nodes: &[NodeId], now: SimTime) -> Vec<(NodeId, NodeId)> {
let mut blocked = Vec::new();
for partition in &self.partitions {
if partition.is_active_at(now) {
for from in nodes {
for to in nodes {
if from != to && partition.spec.blocks(from, to) {
blocked.push((from.clone(), to.clone()));
}
}
}
}
}
blocked
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_bidirectional_partition() {
let spec = PartitionSpec::split(
vec![NodeId::new("a"), NodeId::new("b")],
vec![NodeId::new("c"), NodeId::new("d")],
);
assert!(!spec.blocks(&NodeId::new("a"), &NodeId::new("b")));
assert!(!spec.blocks(&NodeId::new("c"), &NodeId::new("d")));
assert!(spec.blocks(&NodeId::new("a"), &NodeId::new("c")));
assert!(spec.blocks(&NodeId::new("c"), &NodeId::new("a"))); assert!(spec.blocks(&NodeId::new("b"), &NodeId::new("d")));
}
#[test]
fn test_directional_partition() {
let spec = PartitionSpec::block(NodeId::new("a"), NodeId::new("b"));
assert!(spec.blocks(&NodeId::new("a"), &NodeId::new("b")));
assert!(!spec.blocks(&NodeId::new("b"), &NodeId::new("a")));
assert!(!spec.blocks(&NodeId::new("a"), &NodeId::new("c")));
}
#[test]
fn test_isolate_partition() {
let spec = PartitionSpec::isolate(
NodeId::new("isolated"),
vec![NodeId::new("a"), NodeId::new("b"), NodeId::new("c")],
);
assert!(spec.blocks(&NodeId::new("isolated"), &NodeId::new("a")));
assert!(spec.blocks(&NodeId::new("isolated"), &NodeId::new("b")));
assert!(spec.blocks(&NodeId::new("a"), &NodeId::new("isolated")));
assert!(!spec.blocks(&NodeId::new("a"), &NodeId::new("b")));
}
#[test]
fn test_partition_manager_timing() {
let mut manager = PartitionManager::new();
let now = SimTime::from_millis(100);
let end = SimTime::from_millis(200);
manager.add_partition(
PartitionSpec::split(vec![NodeId::new("a")], vec![NodeId::new("b")]),
now,
Some(end),
);
assert!(manager.is_partitioned(&NodeId::new("a"), &NodeId::new("b"), now));
assert!(manager.is_partitioned(
&NodeId::new("a"),
&NodeId::new("b"),
SimTime::from_millis(150)
));
assert!(!manager.is_partitioned(
&NodeId::new("a"),
&NodeId::new("b"),
SimTime::from_millis(200)
));
}
#[test]
fn test_partition_manager_permanent() {
let mut manager = PartitionManager::new();
manager.add_partition(
PartitionSpec::split(vec![NodeId::new("a")], vec![NodeId::new("b")]),
SimTime::ZERO,
None, );
assert!(manager.is_partitioned(
&NodeId::new("a"),
&NodeId::new("b"),
SimTime::from_millis(1_000_000)
));
}
#[test]
fn test_partition_manager_clear() {
let mut manager = PartitionManager::new();
manager.add_partition(
PartitionSpec::split(vec![NodeId::new("a")], vec![NodeId::new("b")]),
SimTime::ZERO,
None,
);
assert!(manager.has_partitions());
manager.clear();
assert!(!manager.has_partitions());
assert!(!manager.is_partitioned(&NodeId::new("a"), &NodeId::new("b"), SimTime::ZERO));
}
}