1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
use crate::error::ChangeMembershipError;
use crate::error::InProgress;
use crate::ChangeMembers;
use crate::Membership;
use crate::MembershipState;
use crate::Node;
use crate::NodeId;

pub(crate) struct ChangeHandler<'m, NID, N>
where
    NID: NodeId,
    N: Node,
{
    pub(crate) state: &'m MembershipState<NID, N>,
}

impl<'m, NID, N> ChangeHandler<'m, NID, N>
where
    NID: NodeId,
    N: Node,
{
    /// Builds a new membership configuration by applying changes to the current configuration.
    ///
    /// * `changes`: The changes to apply to the current membership configuration.
    /// * `retain` specifies whether to retain the removed voters as a learners, i.e., nodes that
    ///   continue to receive log replication from the leader.
    ///
    /// A Result containing the new membership configuration if the operation succeeds, or a
    /// `ChangeMembershipError` if an error occurs.
    ///
    /// This function ensures that the cluster will have at least one voter in the new membership
    /// configuration.
    pub(crate) fn apply(
        &self,
        change: ChangeMembers<NID, N>,
        retain: bool,
    ) -> Result<Membership<NID, N>, ChangeMembershipError<NID>> {
        self.ensure_committed()?;

        let new_membership = self.state.effective().membership().clone().change(change, retain)?;
        Ok(new_membership)
    }

    /// Ensures that the latest membership has been committed.
    ///
    /// Returns Ok if the last membership is committed, or an InProgress error
    /// otherwise, to indicate a change-membership request should be rejected.
    pub(crate) fn ensure_committed(&self) -> Result<(), InProgress<NID>> {
        let effective = self.state.effective();
        let committed = self.state.committed();

        if effective.log_id() == committed.log_id() {
            // Ok: last membership(effective) is committed
            Ok(())
        } else {
            Err(InProgress {
                committed: *committed.log_id(),
                membership_log_id: *effective.log_id(),
            })
        }
    }
}

#[cfg(test)]
mod tests {
    use std::sync::Arc;

    use maplit::btreemap;
    use maplit::btreeset;

    use crate::error::ChangeMembershipError;
    use crate::error::EmptyMembership;
    use crate::error::InProgress;
    use crate::error::LearnerNotFound;
    use crate::testing::log_id;
    use crate::ChangeMembers;
    use crate::EffectiveMembership;
    use crate::Membership;
    use crate::MembershipState;

    /// Create an Arc<EffectiveMembership>
    fn effmem(term: u64, index: u64, m: Membership<u64, ()>) -> Arc<EffectiveMembership<u64, ()>> {
        let lid = Some(log_id(term, index));
        Arc::new(EffectiveMembership::new(lid, m))
    }

    fn m1() -> Membership<u64, ()> {
        Membership::new(vec![btreeset! {1}], None)
    }

    fn m12() -> Membership<u64, ()> {
        Membership::new(vec![btreeset! {1,2}], None)
    }

    fn m123_345() -> Membership<u64, ()> {
        Membership::new(vec![btreeset! {1,2,3}, btreeset! {3,4,5}], None)
    }

    #[test]
    fn test_apply_not_committed() -> anyhow::Result<()> {
        let new = || MembershipState::new(effmem(2, 2, m1()), effmem(3, 4, m123_345()));
        let res = new().change_handler().apply(ChangeMembers::AddVoter(btreeset! {1}), false);

        assert_eq!(
            Err(ChangeMembershipError::InProgress(InProgress {
                committed: Some(log_id(2, 2)),
                membership_log_id: Some(log_id(3, 4))
            })),
            res
        );

        Ok(())
    }

    #[test]
    fn test_apply_empty_voters() -> anyhow::Result<()> {
        let new = || MembershipState::new(effmem(3, 4, m1()), effmem(3, 4, m1()));
        let res = new().change_handler().apply(ChangeMembers::RemoveVoter(btreeset! {1}), false);

        assert_eq!(Err(ChangeMembershipError::EmptyMembership(EmptyMembership {})), res);

        Ok(())
    }

    #[test]
    fn test_apply_learner_not_found() -> anyhow::Result<()> {
        let new = || MembershipState::new(effmem(3, 4, m1()), effmem(3, 4, m1()));
        let res = new().change_handler().apply(ChangeMembers::AddVoter(btreeset! {2}), false);

        assert_eq!(
            Err(ChangeMembershipError::LearnerNotFound(LearnerNotFound { node_id: 2 })),
            res
        );

        Ok(())
    }

    #[test]
    fn test_apply_retain_learner() -> anyhow::Result<()> {
        let new = || MembershipState::new(effmem(3, 4, m12()), effmem(3, 4, m123_345()));

        // Do not leave removed voters as learner
        let res = new().change_handler().apply(ChangeMembers::RemoveVoter(btreeset! {1,2}), false);
        assert_eq!(
            Ok(Membership::new(vec![btreeset! {3,4,5}], btreemap! {3=>(),4=>(),5=>()})),
            res
        );

        // Leave removed voters as learner
        let res = new().change_handler().apply(ChangeMembers::RemoveVoter(btreeset! {1,2}), true);
        assert_eq!(
            Ok(Membership::new(
                vec![btreeset! {3,4,5}],
                btreemap! {1=>(),2=>(),3=>(),4=>(),5=>()}
            )),
            res
        );

        Ok(())
    }
}