1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
use std::collections::BTreeMap;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use crate::versioned::Update;
use crate::versioned::UpdateError;
use crate::vote::CommittedLeaderId;
use crate::LogId;
use crate::MessageSummary;
use crate::NodeId;
#[derive(Clone, Debug, Default, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
pub struct ReplicationMetrics<NID: NodeId> {
pub replication: BTreeMap<NID, ReplicationTargetMetrics<NID>>,
}
impl<NID: NodeId> MessageSummary<ReplicationMetrics<NID>> for ReplicationMetrics<NID> {
fn summary(&self) -> String {
let mut res = vec!["LeaderMetrics{".to_string()];
for (i, (k, v)) in self.replication.iter().enumerate() {
if i > 0 {
res.push(", ".to_string());
}
res.push(format!("{}:{}", k, v.summary()));
}
res.push("}".to_string());
res.join("")
}
}
pub(crate) struct UpdateMatchedLogId<NID: NodeId> {
pub(crate) target: NID,
pub(crate) matching: LogId<NID>,
}
impl<NID: NodeId> Update<ReplicationMetrics<NID>> for UpdateMatchedLogId<NID> {
fn apply_in_place(&self, to: &Arc<ReplicationMetrics<NID>>) -> Result<(), UpdateError> {
let target_metrics = to.replication.get(&self.target).ok_or(UpdateError::CanNotUpdateInPlace)?;
if target_metrics.matched_leader_id == self.matching.leader_id {
target_metrics.matched_index.store(self.matching.index, Ordering::Relaxed);
return Ok(());
}
Err(UpdateError::CanNotUpdateInPlace)
}
fn apply_mut(&self, to: &mut ReplicationMetrics<NID>) {
to.replication.insert(self.target, ReplicationTargetMetrics {
matched_leader_id: self.matching.leader_id,
matched_index: AtomicU64::new(self.matching.index),
});
}
}
pub(crate) struct RemoveTarget<NID: NodeId> {
pub target: NID,
}
impl<NID: NodeId> Update<ReplicationMetrics<NID>> for RemoveTarget<NID> {
fn apply_in_place(&self, _to: &Arc<ReplicationMetrics<NID>>) -> Result<(), UpdateError> {
Err(UpdateError::CanNotUpdateInPlace)
}
fn apply_mut(&self, to: &mut ReplicationMetrics<NID>) {
to.replication.remove(&self.target);
}
}
#[derive(Debug, Default)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
pub struct ReplicationTargetMetrics<NID: NodeId> {
pub(crate) matched_leader_id: CommittedLeaderId<NID>,
pub(crate) matched_index: AtomicU64,
}
impl<NID: NodeId> Clone for ReplicationTargetMetrics<NID> {
fn clone(&self) -> Self {
Self {
matched_leader_id: self.matched_leader_id,
matched_index: AtomicU64::new(self.matched_index.load(Ordering::Relaxed)),
}
}
}
impl<NID: NodeId> PartialEq for ReplicationTargetMetrics<NID> {
fn eq(&self, other: &Self) -> bool {
self.matched_leader_id == other.matched_leader_id
&& self.matched_index.load(Ordering::Relaxed) == other.matched_index.load(Ordering::Relaxed)
}
}
impl<NID: NodeId> Eq for ReplicationTargetMetrics<NID> {}
impl<NID: NodeId> ReplicationTargetMetrics<NID> {
pub fn new(log_id: LogId<NID>) -> Self {
Self {
matched_leader_id: log_id.leader_id,
matched_index: AtomicU64::new(log_id.index),
}
}
pub fn matched(&self) -> LogId<NID> {
let index = self.matched_index.load(Ordering::Relaxed);
LogId {
leader_id: self.matched_leader_id,
index,
}
}
}
impl<NID: NodeId> MessageSummary<ReplicationTargetMetrics<NID>> for ReplicationTargetMetrics<NID> {
fn summary(&self) -> String {
format!("{}", self.matched())
}
}