1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
use std::collections::BTreeMap;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;

use crate::versioned::Update;
use crate::versioned::UpdateError;
use crate::vote::CommittedLeaderId;
use crate::LogId;
use crate::MessageSummary;
use crate::NodeId;

/// The metrics about the leader. It is Some() only when this node is leader.
#[derive(Clone, Debug, Default, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
pub struct ReplicationMetrics<NID: NodeId> {
    /// Replication metrics of all known replication target: voters and learners
    pub replication: BTreeMap<NID, ReplicationTargetMetrics<NID>>,
}

impl<NID: NodeId> MessageSummary<ReplicationMetrics<NID>> for ReplicationMetrics<NID> {
    fn summary(&self) -> String {
        let mut res = vec!["LeaderMetrics{".to_string()];
        for (i, (k, v)) in self.replication.iter().enumerate() {
            if i > 0 {
                res.push(", ".to_string());
            }
            res.push(format!("{}:{}", k, v.summary()));
        }

        res.push("}".to_string());
        res.join("")
    }
}

/// Update one progress metrics in `LeaderMetrics.replication`.
pub(crate) struct UpdateMatchedLogId<NID: NodeId> {
    pub(crate) target: NID,
    pub(crate) matching: LogId<NID>,
}

impl<NID: NodeId> Update<ReplicationMetrics<NID>> for UpdateMatchedLogId<NID> {
    /// If there is already a record for the target node. Just modify the atomic u64.
    fn apply_in_place(&self, to: &Arc<ReplicationMetrics<NID>>) -> Result<(), UpdateError> {
        let target_metrics = to.replication.get(&self.target).ok_or(UpdateError::CanNotUpdateInPlace)?;

        if target_metrics.matched_leader_id == self.matching.leader_id {
            target_metrics.matched_index.store(self.matching.index, Ordering::Relaxed);
            return Ok(());
        }

        Err(UpdateError::CanNotUpdateInPlace)
    }

    /// To insert a new record always work.
    fn apply_mut(&self, to: &mut ReplicationMetrics<NID>) {
        to.replication.insert(self.target, ReplicationTargetMetrics {
            matched_leader_id: self.matching.leader_id,
            matched_index: AtomicU64::new(self.matching.index),
        });
    }
}

/// Remove one replication metrics in `LeaderMetrics.replication`.
pub(crate) struct RemoveTarget<NID: NodeId> {
    pub target: NID,
}

impl<NID: NodeId> Update<ReplicationMetrics<NID>> for RemoveTarget<NID> {
    /// Removing can not be done in place
    fn apply_in_place(&self, _to: &Arc<ReplicationMetrics<NID>>) -> Result<(), UpdateError> {
        Err(UpdateError::CanNotUpdateInPlace)
    }

    fn apply_mut(&self, to: &mut ReplicationMetrics<NID>) {
        to.replication.remove(&self.target);
    }
}

#[derive(Debug, Default)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
pub struct ReplicationTargetMetrics<NID: NodeId> {
    pub(crate) matched_leader_id: CommittedLeaderId<NID>,
    pub(crate) matched_index: AtomicU64,
}

impl<NID: NodeId> Clone for ReplicationTargetMetrics<NID> {
    fn clone(&self) -> Self {
        Self {
            matched_leader_id: self.matched_leader_id,
            matched_index: AtomicU64::new(self.matched_index.load(Ordering::Relaxed)),
        }
    }
}

impl<NID: NodeId> PartialEq for ReplicationTargetMetrics<NID> {
    fn eq(&self, other: &Self) -> bool {
        self.matched_leader_id == other.matched_leader_id
            && self.matched_index.load(Ordering::Relaxed) == other.matched_index.load(Ordering::Relaxed)
    }
}

impl<NID: NodeId> Eq for ReplicationTargetMetrics<NID> {}

impl<NID: NodeId> ReplicationTargetMetrics<NID> {
    pub fn new(log_id: LogId<NID>) -> Self {
        Self {
            matched_leader_id: log_id.leader_id,
            matched_index: AtomicU64::new(log_id.index),
        }
    }

    pub fn matched(&self) -> LogId<NID> {
        let index = self.matched_index.load(Ordering::Relaxed);
        LogId {
            leader_id: self.matched_leader_id,
            index,
        }
    }
}

impl<NID: NodeId> MessageSummary<ReplicationTargetMetrics<NID>> for ReplicationTargetMetrics<NID> {
    fn summary(&self) -> String {
        format!("{}", self.matched())
    }
}