use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use tracing::{debug, info, warn};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReplicationConfig {
pub default_replication_factor: u32,
pub min_write_replicas: u32,
pub write_mode: WriteAckMode,
pub placement_strategy: PlacementStrategy,
pub sync_timeout_ms: u64,
pub health_check_interval_ms: u64,
pub max_lag_ms: u64,
pub auto_failover: bool,
pub min_healthy_replicas: u32,
}
impl Default for ReplicationConfig {
fn default() -> Self {
Self {
default_replication_factor: 3,
min_write_replicas: 2,
write_mode: WriteAckMode::Majority,
placement_strategy: PlacementStrategy::RackAware,
sync_timeout_ms: 5000,
health_check_interval_ms: 1000,
max_lag_ms: 30000,
auto_failover: true,
min_healthy_replicas: 1,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum WriteAckMode {
PrimaryOnly,
Majority,
All,
Quorum(u32),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum PlacementStrategy {
RackAware,
ZoneAware,
Random,
LocalityFirst,
TagBased,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum ReplicaState {
InSync,
Syncing,
Lagging,
Offline,
Initializing,
Decommissioning,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeTopology {
pub node_id: String,
pub datacenter: String,
pub zone: String,
pub rack: String,
pub tags: HashMap<String, String>,
}
impl NodeTopology {
pub fn new(node_id: String) -> Self {
Self {
node_id,
datacenter: "dc1".to_string(),
zone: "zone-a".to_string(),
rack: "rack-1".to_string(),
tags: HashMap::new(),
}
}
pub fn with_location(mut self, datacenter: &str, zone: &str, rack: &str) -> Self {
self.datacenter = datacenter.to_string();
self.zone = zone.to_string();
self.rack = rack.to_string();
self
}
pub fn with_tag(mut self, key: &str, value: &str) -> Self {
self.tags.insert(key.to_string(), value.to_string());
self
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReplicaInfo {
pub node_id: String,
pub state: ReplicaState,
pub is_primary: bool,
pub lag_ms: u64,
pub last_sync: Option<u64>,
pub failure_count: u32,
pub created_at: u64,
}
impl ReplicaInfo {
pub fn new(node_id: String, is_primary: bool) -> Self {
Self {
node_id,
state: if is_primary {
ReplicaState::InSync
} else {
ReplicaState::Initializing
},
is_primary,
lag_ms: 0,
last_sync: None,
failure_count: 0,
created_at: current_time_ms(),
}
}
pub fn is_healthy(&self) -> bool {
matches!(self.state, ReplicaState::InSync | ReplicaState::Syncing)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReplicaSet {
pub shard_id: String,
pub replication_factor: u32,
pub primary: Option<String>,
pub replicas: HashMap<String, ReplicaInfo>,
pub target_replicas: u32,
pub version: u64,
}
impl ReplicaSet {
pub fn new(shard_id: String, replication_factor: u32) -> Self {
Self {
shard_id,
replication_factor,
primary: None,
replicas: HashMap::new(),
target_replicas: replication_factor,
version: 0,
}
}
pub fn healthy_count(&self) -> usize {
self.replicas.values().filter(|r| r.is_healthy()).count()
}
pub fn in_sync_count(&self) -> usize {
self.replicas
.values()
.filter(|r| r.state == ReplicaState::InSync)
.count()
}
pub fn has_quorum(&self) -> bool {
let required = (self.replication_factor / 2) + 1;
self.healthy_count() >= required as usize
}
pub fn can_write(&self, mode: WriteAckMode) -> bool {
let healthy = self.healthy_count() as u32;
match mode {
WriteAckMode::PrimaryOnly => self.primary.is_some(),
WriteAckMode::Majority => healthy > (self.replication_factor / 2),
WriteAckMode::All => healthy >= self.replication_factor,
WriteAckMode::Quorum(n) => healthy >= n,
}
}
pub fn get_in_sync_replicas(&self) -> Vec<String> {
self.replicas
.iter()
.filter(|(_, r)| r.state == ReplicaState::InSync)
.map(|(id, _)| id.clone())
.collect()
}
pub fn get_healthy_replicas(&self) -> Vec<String> {
self.replicas
.iter()
.filter(|(_, r)| r.is_healthy())
.map(|(id, _)| id.clone())
.collect()
}
}
pub struct ReplicaManager {
config: ReplicationConfig,
replica_sets: Arc<RwLock<HashMap<String, ReplicaSet>>>,
node_topology: Arc<RwLock<HashMap<String, NodeTopology>>>,
available_nodes: Arc<RwLock<HashSet<String>>>,
}
impl ReplicaManager {
pub fn new(config: ReplicationConfig) -> Self {
Self {
config,
replica_sets: Arc::new(RwLock::new(HashMap::new())),
node_topology: Arc::new(RwLock::new(HashMap::new())),
available_nodes: Arc::new(RwLock::new(HashSet::new())),
}
}
pub fn register_node(&self, topology: NodeTopology) {
let node_id = topology.node_id.clone();
self.node_topology.write().insert(node_id.clone(), topology);
self.available_nodes.write().insert(node_id.clone());
info!(node_id = %node_id, "Registered node for replica placement");
}
pub fn deregister_node(&self, node_id: &str) {
self.available_nodes.write().remove(node_id);
info!(node_id = %node_id, "Deregistered node from replica placement");
}
pub fn create_replica_set(
&self,
shard_id: &str,
replication_factor: Option<u32>,
) -> Result<ReplicaSet, ReplicationError> {
let rf = replication_factor.unwrap_or(self.config.default_replication_factor);
let available = self.available_nodes.read();
if available.len() < rf as usize {
return Err(ReplicationError::InsufficientNodes {
required: rf as usize,
available: available.len(),
});
}
let mut replica_set = ReplicaSet::new(shard_id.to_string(), rf);
let selected_nodes = self.select_nodes_for_placement(&available, rf as usize)?;
let primary_node = selected_nodes[0].clone();
replica_set.primary = Some(primary_node.clone());
replica_set
.replicas
.insert(primary_node.clone(), ReplicaInfo::new(primary_node, true));
for node_id in selected_nodes.into_iter().skip(1) {
replica_set
.replicas
.insert(node_id.clone(), ReplicaInfo::new(node_id, false));
}
self.replica_sets
.write()
.insert(shard_id.to_string(), replica_set.clone());
info!(
shard_id = %shard_id,
replication_factor = rf,
"Created replica set"
);
Ok(replica_set)
}
fn select_nodes_for_placement(
&self,
available: &HashSet<String>,
count: usize,
) -> Result<Vec<String>, ReplicationError> {
let topology = self.node_topology.read();
match self.config.placement_strategy {
PlacementStrategy::RackAware => self.select_rack_aware(available, &topology, count),
PlacementStrategy::ZoneAware => self.select_zone_aware(available, &topology, count),
PlacementStrategy::Random => self.select_random(available, count),
PlacementStrategy::LocalityFirst => {
self.select_locality_first(available, &topology, count)
}
PlacementStrategy::TagBased => self.select_random(available, count), }
}
fn select_rack_aware(
&self,
available: &HashSet<String>,
topology: &HashMap<String, NodeTopology>,
count: usize,
) -> Result<Vec<String>, ReplicationError> {
let mut selected = Vec::new();
let mut used_racks: HashSet<String> = HashSet::new();
for node_id in available {
if let Some(topo) = topology.get(node_id) {
if !used_racks.contains(&topo.rack) {
selected.push(node_id.clone());
used_racks.insert(topo.rack.clone());
if selected.len() >= count {
return Ok(selected);
}
}
}
}
for node_id in available {
if !selected.contains(node_id) {
selected.push(node_id.clone());
if selected.len() >= count {
return Ok(selected);
}
}
}
if selected.len() >= count {
Ok(selected)
} else {
Err(ReplicationError::InsufficientNodes {
required: count,
available: selected.len(),
})
}
}
fn select_zone_aware(
&self,
available: &HashSet<String>,
topology: &HashMap<String, NodeTopology>,
count: usize,
) -> Result<Vec<String>, ReplicationError> {
let mut selected = Vec::new();
let mut used_zones: HashSet<String> = HashSet::new();
for node_id in available {
if let Some(topo) = topology.get(node_id) {
if !used_zones.contains(&topo.zone) {
selected.push(node_id.clone());
used_zones.insert(topo.zone.clone());
if selected.len() >= count {
return Ok(selected);
}
}
}
}
for node_id in available {
if !selected.contains(node_id) {
selected.push(node_id.clone());
if selected.len() >= count {
return Ok(selected);
}
}
}
if selected.len() >= count {
Ok(selected)
} else {
Err(ReplicationError::InsufficientNodes {
required: count,
available: selected.len(),
})
}
}
fn select_random(
&self,
available: &HashSet<String>,
count: usize,
) -> Result<Vec<String>, ReplicationError> {
let nodes: Vec<String> = available.iter().cloned().collect();
if nodes.len() < count {
return Err(ReplicationError::InsufficientNodes {
required: count,
available: nodes.len(),
});
}
Ok(nodes.into_iter().take(count).collect())
}
fn select_locality_first(
&self,
available: &HashSet<String>,
topology: &HashMap<String, NodeTopology>,
count: usize,
) -> Result<Vec<String>, ReplicationError> {
let mut selected = Vec::new();
let mut reference_topo: Option<&NodeTopology> = None;
for node_id in available {
if let Some(topo) = topology.get(node_id) {
if selected.is_empty() {
selected.push(node_id.clone());
reference_topo = Some(topo);
} else if let Some(ref_topo) = reference_topo {
if topo.rack == ref_topo.rack || topo.zone == ref_topo.zone {
selected.push(node_id.clone());
}
}
if selected.len() >= count {
return Ok(selected);
}
}
}
for node_id in available {
if !selected.contains(node_id) {
selected.push(node_id.clone());
if selected.len() >= count {
return Ok(selected);
}
}
}
if selected.len() >= count {
Ok(selected)
} else {
Err(ReplicationError::InsufficientNodes {
required: count,
available: selected.len(),
})
}
}
pub fn get_replica_set(&self, shard_id: &str) -> Option<ReplicaSet> {
self.replica_sets.read().get(shard_id).cloned()
}
pub fn update_replica_state(
&self,
shard_id: &str,
node_id: &str,
state: ReplicaState,
lag_ms: Option<u64>,
) -> Result<(), ReplicationError> {
let mut sets = self.replica_sets.write();
let set = sets
.get_mut(shard_id)
.ok_or_else(|| ReplicationError::ShardNotFound(shard_id.to_string()))?;
let replica = set
.replicas
.get_mut(node_id)
.ok_or_else(|| ReplicationError::ReplicaNotFound(node_id.to_string()))?;
let old_state = replica.state;
replica.state = state;
if let Some(lag) = lag_ms {
replica.lag_ms = lag;
}
if state == ReplicaState::InSync {
replica.last_sync = Some(current_time_ms());
replica.failure_count = 0;
}
set.version += 1;
if old_state != state {
debug!(
shard_id = %shard_id,
node_id = %node_id,
old_state = ?old_state,
new_state = ?state,
"Replica state changed"
);
}
Ok(())
}
pub fn record_replica_failure(
&self,
shard_id: &str,
node_id: &str,
) -> Result<(), ReplicationError> {
let mut sets = self.replica_sets.write();
let set = sets
.get_mut(shard_id)
.ok_or_else(|| ReplicationError::ShardNotFound(shard_id.to_string()))?;
if let Some(replica) = set.replicas.get_mut(node_id) {
replica.failure_count += 1;
if replica.failure_count >= 3 {
replica.state = ReplicaState::Offline;
warn!(
shard_id = %shard_id,
node_id = %node_id,
failure_count = replica.failure_count,
"Replica marked offline due to failures"
);
}
set.version += 1;
}
Ok(())
}
pub fn promote_replica(
&self,
shard_id: &str,
new_primary_node: &str,
) -> Result<(), ReplicationError> {
let mut sets = self.replica_sets.write();
let set = sets
.get_mut(shard_id)
.ok_or_else(|| ReplicationError::ShardNotFound(shard_id.to_string()))?;
let replica = set
.replicas
.get(new_primary_node)
.ok_or_else(|| ReplicationError::ReplicaNotFound(new_primary_node.to_string()))?;
if !replica.is_healthy() {
return Err(ReplicationError::UnhealthyReplica(
new_primary_node.to_string(),
));
}
if let Some(old_primary) = &set.primary {
if let Some(old_replica) = set.replicas.get_mut(old_primary) {
old_replica.is_primary = false;
}
}
if let Some(new_replica) = set.replicas.get_mut(new_primary_node) {
new_replica.is_primary = true;
}
let old_primary = set.primary.clone();
set.primary = Some(new_primary_node.to_string());
set.version += 1;
info!(
shard_id = %shard_id,
old_primary = ?old_primary,
new_primary = %new_primary_node,
"Promoted replica to primary"
);
Ok(())
}
pub fn add_replica(&self, shard_id: &str, node_id: &str) -> Result<(), ReplicationError> {
let mut sets = self.replica_sets.write();
let set = sets
.get_mut(shard_id)
.ok_or_else(|| ReplicationError::ShardNotFound(shard_id.to_string()))?;
if set.replicas.contains_key(node_id) {
return Err(ReplicationError::ReplicaExists(node_id.to_string()));
}
if !self.available_nodes.read().contains(node_id) {
return Err(ReplicationError::NodeNotAvailable(node_id.to_string()));
}
set.replicas.insert(
node_id.to_string(),
ReplicaInfo::new(node_id.to_string(), false),
);
set.version += 1;
info!(
shard_id = %shard_id,
node_id = %node_id,
"Added new replica"
);
Ok(())
}
pub fn remove_replica(&self, shard_id: &str, node_id: &str) -> Result<(), ReplicationError> {
let mut sets = self.replica_sets.write();
let set = sets
.get_mut(shard_id)
.ok_or_else(|| ReplicationError::ShardNotFound(shard_id.to_string()))?;
if set.primary.as_deref() == Some(node_id) {
return Err(ReplicationError::CannotRemovePrimary);
}
if set.replicas.len() <= self.config.min_healthy_replicas as usize {
return Err(ReplicationError::MinimumReplicasRequired);
}
set.replicas.remove(node_id);
set.version += 1;
info!(
shard_id = %shard_id,
node_id = %node_id,
"Removed replica"
);
Ok(())
}
pub fn check_under_replicated(&self) -> Vec<(String, usize)> {
let sets = self.replica_sets.read();
let mut under_replicated = Vec::new();
for (shard_id, set) in sets.iter() {
let healthy = set.healthy_count();
if healthy < set.replication_factor as usize {
under_replicated
.push((shard_id.clone(), set.replication_factor as usize - healthy));
}
}
under_replicated
}
pub fn auto_failover(&self, shard_id: &str) -> Result<Option<String>, ReplicationError> {
if !self.config.auto_failover {
return Ok(None);
}
let sets = self.replica_sets.read();
let set = sets
.get(shard_id)
.ok_or_else(|| ReplicationError::ShardNotFound(shard_id.to_string()))?;
if let Some(primary) = &set.primary {
if let Some(replica) = set.replicas.get(primary) {
if replica.is_healthy() {
return Ok(None); }
}
}
let candidate = set
.replicas
.iter()
.filter(|(_, r)| !r.is_primary && r.state == ReplicaState::InSync)
.min_by_key(|(_, r)| r.lag_ms)
.map(|(id, _)| id.clone());
drop(sets);
if let Some(new_primary) = candidate.clone() {
self.promote_replica(shard_id, &new_primary)?;
}
Ok(candidate)
}
pub fn get_stats(&self) -> ReplicationStats {
let sets = self.replica_sets.read();
let nodes = self.available_nodes.read();
let mut total_replicas = 0;
let mut healthy_replicas = 0;
let mut in_sync_replicas = 0;
let mut under_replicated = 0;
for set in sets.values() {
total_replicas += set.replicas.len();
healthy_replicas += set.healthy_count();
in_sync_replicas += set.in_sync_count();
if set.healthy_count() < set.replication_factor as usize {
under_replicated += 1;
}
}
ReplicationStats {
total_replica_sets: sets.len(),
total_replicas,
healthy_replicas,
in_sync_replicas,
under_replicated_shards: under_replicated,
available_nodes: nodes.len(),
}
}
pub fn config(&self) -> &ReplicationConfig {
&self.config
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReplicationStats {
pub total_replica_sets: usize,
pub total_replicas: usize,
pub healthy_replicas: usize,
pub in_sync_replicas: usize,
pub under_replicated_shards: usize,
pub available_nodes: usize,
}
#[derive(Debug, Clone, thiserror::Error)]
pub enum ReplicationError {
#[error("Insufficient nodes for replication: required {required}, available {available}")]
InsufficientNodes { required: usize, available: usize },
#[error("Shard not found: {0}")]
ShardNotFound(String),
#[error("Replica not found: {0}")]
ReplicaNotFound(String),
#[error("Replica already exists: {0}")]
ReplicaExists(String),
#[error("Node not available: {0}")]
NodeNotAvailable(String),
#[error("Cannot remove primary replica")]
CannotRemovePrimary,
#[error("Minimum replicas required")]
MinimumReplicasRequired,
#[error("Replica is unhealthy: {0}")]
UnhealthyReplica(String),
#[error("No healthy replicas available")]
NoHealthyReplicas,
}
fn current_time_ms() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64
}
#[cfg(test)]
mod tests {
use super::*;
fn create_test_manager() -> ReplicaManager {
let config = ReplicationConfig {
default_replication_factor: 3,
min_write_replicas: 2,
..Default::default()
};
let manager = ReplicaManager::new(config);
for i in 0..5 {
let topo = NodeTopology::new(format!("node-{}", i)).with_location(
"dc1",
&format!("zone-{}", i % 2),
&format!("rack-{}", i),
);
manager.register_node(topo);
}
manager
}
#[test]
fn test_create_replica_set() {
let manager = create_test_manager();
let result = manager.create_replica_set("shard-1", None);
assert!(result.is_ok());
let set = result.unwrap();
assert_eq!(set.shard_id, "shard-1");
assert_eq!(set.replication_factor, 3);
assert_eq!(set.replicas.len(), 3);
assert!(set.primary.is_some());
}
#[test]
fn test_replica_set_with_custom_factor() {
let manager = create_test_manager();
let result = manager.create_replica_set("shard-2", Some(2));
assert!(result.is_ok());
let set = result.unwrap();
assert_eq!(set.replication_factor, 2);
assert_eq!(set.replicas.len(), 2);
}
#[test]
fn test_insufficient_nodes() {
let config = ReplicationConfig::default();
let manager = ReplicaManager::new(config);
manager.register_node(NodeTopology::new("node-1".to_string()));
manager.register_node(NodeTopology::new("node-2".to_string()));
let result = manager.create_replica_set("shard-1", Some(3));
assert!(matches!(
result,
Err(ReplicationError::InsufficientNodes { .. })
));
}
#[test]
fn test_update_replica_state() {
let manager = create_test_manager();
manager.create_replica_set("shard-1", None).unwrap();
let set = manager.get_replica_set("shard-1").unwrap();
let node_id = set.replicas.keys().next().unwrap().clone();
let result =
manager.update_replica_state("shard-1", &node_id, ReplicaState::Syncing, Some(100));
assert!(result.is_ok());
let updated_set = manager.get_replica_set("shard-1").unwrap();
let replica = updated_set.replicas.get(&node_id).unwrap();
assert_eq!(replica.state, ReplicaState::Syncing);
assert_eq!(replica.lag_ms, 100);
}
#[test]
fn test_promote_replica() {
let manager = create_test_manager();
manager.create_replica_set("shard-1", None).unwrap();
let set = manager.get_replica_set("shard-1").unwrap();
let old_primary = set.primary.clone().unwrap();
let new_primary = set
.replicas
.iter()
.find(|(_, r)| !r.is_primary)
.map(|(id, _)| id.clone())
.unwrap();
manager
.update_replica_state("shard-1", &new_primary, ReplicaState::InSync, None)
.unwrap();
let result = manager.promote_replica("shard-1", &new_primary);
assert!(result.is_ok());
let updated_set = manager.get_replica_set("shard-1").unwrap();
assert_eq!(updated_set.primary, Some(new_primary.clone()));
assert!(!updated_set.replicas.get(&old_primary).unwrap().is_primary);
assert!(updated_set.replicas.get(&new_primary).unwrap().is_primary);
}
#[test]
fn test_add_remove_replica() {
let manager = create_test_manager();
manager.create_replica_set("shard-1", Some(2)).unwrap();
let set = manager.get_replica_set("shard-1").unwrap();
let new_node = (0..5)
.map(|i| format!("node-{}", i))
.find(|n| !set.replicas.contains_key(n))
.expect("Should have available node");
let result = manager.add_replica("shard-1", &new_node);
assert!(result.is_ok());
let set = manager.get_replica_set("shard-1").unwrap();
assert_eq!(set.replicas.len(), 3);
let non_primary = set
.replicas
.iter()
.find(|(_, r)| !r.is_primary)
.map(|(id, _)| id.clone())
.unwrap();
let result = manager.remove_replica("shard-1", &non_primary);
assert!(result.is_ok());
let set = manager.get_replica_set("shard-1").unwrap();
assert_eq!(set.replicas.len(), 2);
}
#[test]
fn test_cannot_remove_primary() {
let manager = create_test_manager();
manager.create_replica_set("shard-1", None).unwrap();
let set = manager.get_replica_set("shard-1").unwrap();
let primary = set.primary.unwrap();
let result = manager.remove_replica("shard-1", &primary);
assert!(matches!(result, Err(ReplicationError::CannotRemovePrimary)));
}
#[test]
fn test_replica_set_quorum() {
let manager = create_test_manager();
manager.create_replica_set("shard-1", Some(3)).unwrap();
let set = manager.get_replica_set("shard-1").unwrap();
for node_id in set.replicas.keys() {
manager
.update_replica_state("shard-1", node_id, ReplicaState::InSync, None)
.unwrap();
}
let set = manager.get_replica_set("shard-1").unwrap();
assert!(set.has_quorum());
assert!(set.can_write(WriteAckMode::Majority));
assert!(set.can_write(WriteAckMode::All));
}
#[test]
fn test_check_under_replicated() {
let manager = create_test_manager();
manager.create_replica_set("shard-1", Some(3)).unwrap();
let set = manager.get_replica_set("shard-1").unwrap();
let node_id = set.replicas.keys().next().unwrap().clone();
manager
.update_replica_state("shard-1", &node_id, ReplicaState::Offline, None)
.unwrap();
let under_replicated = manager.check_under_replicated();
assert_eq!(under_replicated.len(), 1);
assert_eq!(under_replicated[0].0, "shard-1");
}
#[test]
fn test_replication_stats() {
let manager = create_test_manager();
manager.create_replica_set("shard-1", Some(3)).unwrap();
manager.create_replica_set("shard-2", Some(2)).unwrap();
let stats = manager.get_stats();
assert_eq!(stats.total_replica_sets, 2);
assert_eq!(stats.total_replicas, 5);
assert_eq!(stats.available_nodes, 5);
}
#[test]
fn test_rack_aware_placement() {
let config = ReplicationConfig {
default_replication_factor: 3,
placement_strategy: PlacementStrategy::RackAware,
..Default::default()
};
let manager = ReplicaManager::new(config);
for i in 0..5 {
let topo = NodeTopology::new(format!("node-{}", i)).with_location(
"dc1",
"zone-a",
&format!("rack-{}", i),
);
manager.register_node(topo);
}
let set = manager.create_replica_set("shard-1", None).unwrap();
let topology = manager.node_topology.read();
let racks: HashSet<_> = set
.replicas
.keys()
.filter_map(|id| topology.get(id))
.map(|t| t.rack.clone())
.collect();
assert_eq!(racks.len(), 3); }
#[test]
fn test_record_replica_failure() {
let manager = create_test_manager();
manager.create_replica_set("shard-1", None).unwrap();
let set = manager.get_replica_set("shard-1").unwrap();
let node_id = set.replicas.keys().next().unwrap().clone();
for _ in 0..3 {
manager.record_replica_failure("shard-1", &node_id).unwrap();
}
let set = manager.get_replica_set("shard-1").unwrap();
let replica = set.replicas.get(&node_id).unwrap();
assert_eq!(replica.state, ReplicaState::Offline);
assert_eq!(replica.failure_count, 3);
}
}