use crate::model::replica_id::ReplicaId;
use crate::TypeConfig;
use std::fmt::{Display, Formatter};
use std::sync::{Arc, RwLock};
use std::sync::atomic::{AtomicUsize, Ordering};
#[derive(Debug, Clone)]
pub struct ReplicaGroup<C>
where
C: TypeConfig,
{
inner: Arc<ReplicaGroupWrapper<C>>,
}
#[derive(Debug)]
pub struct ReplicaGroupWrapper<C>
where
C: TypeConfig,
{
group_name: String,
primary: C::NodeId,
secondaries: RwLock<Vec<C::NodeId>>,
version: AtomicUsize,
term: usize,
}
impl<C> Display for ReplicaGroupWrapper<C>
where
C: TypeConfig,
{
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"ReplicaGroup {{ group_name: {}, primary: {:?}, secondaries: {:?}, version: {}, term: {} }}",
self.group_name, self.primary, self.secondaries, self.version.load(Ordering::Relaxed), self.term
)
}
}
impl<C> ReplicaGroupWrapper<C>
where
C: TypeConfig,
{
fn inc_version(&self) {
self.version.fetch_add(1, Ordering::Relaxed);
}
fn add_secondary(&self, node_id: C::NodeId) {
let mut secondaries = self.secondaries.write().unwrap();
assert!(!secondaries.contains(&node_id));
secondaries.push(node_id);
self.inc_version();
}
pub fn remove_secondary(&self, node_id: &C::NodeId) {
let mut secondaries = self.secondaries.write().unwrap();
let removed = secondaries.iter().position(
|x| x == node_id
);
assert!(removed.is_some());
if let Some(index) = removed {
secondaries.remove(index);
self.inc_version();
}
}
}
impl<C> ReplicaGroup<C>
where
C: TypeConfig,
{
pub fn new(
group_name: String,
version: usize,
term: usize,
primary: C::NodeId,
secondaries: Vec<C::NodeId>,
) -> ReplicaGroup<C> {
let group_wrapper = ReplicaGroupWrapper {
group_name,
primary,
secondaries: RwLock::new(secondaries),
version: AtomicUsize::new(version),
term,
};
ReplicaGroup {
inner: Arc::new(group_wrapper),
}
}
pub fn is_primary(&self, replica_id: &ReplicaId<C::NodeId>) -> bool {
self.inner.primary.eq(&replica_id.node_id())
}
pub fn is_secondary(&self, replica_id: &ReplicaId<C::NodeId>) -> bool {
self.inner.secondaries.read().unwrap().contains(&replica_id.node_id())
}
pub fn primary_id(&self) -> ReplicaId<C::NodeId> {
ReplicaId::new(self.inner.group_name.clone(), self.inner.primary.clone())
}
pub fn secondary_ids(&self) -> Vec<ReplicaId<C::NodeId>> {
self.inner
.secondaries.read().unwrap()
.iter()
.map(|node_id| ReplicaId::<C::NodeId>::new(self.inner.group_name.to_string(), node_id.clone()))
.collect()
}
pub fn version(&self) -> usize {
self.inner.version.load(Ordering::Relaxed)
}
pub fn term(&self) -> usize {
self.inner.term
}
pub fn group_name(&self) -> String {
self.inner.group_name.clone()
}
pub fn remove_secondary(&mut self, node_id: &C::NodeId) {
self.inner.remove_secondary(node_id)
}
pub fn add_secondary(&mut self, node_id: C::NodeId) {
self.inner.add_secondary(node_id);
}
}