1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
//! RPC用のメッセージ群.
//!
//! なおRaftの論文に倣って"RPC"という呼称を採用しているが、
//! 実際にここで想定されている通信モデルは、RPCではなく
//! 非同期のメッセージ送受信モデル、となっている.
use election::Term;
use log::{LogIndex, LogPosition, LogPrefix, LogSuffix};
use node::NodeId;

/// RPC用のメッセージ全般.
#[derive(Debug, Clone)]
#[allow(missing_docs)]
pub enum Message {
    RequestVoteCall(RequestVoteCall),
    RequestVoteReply(RequestVoteReply),
    AppendEntriesCall(AppendEntriesCall),
    AppendEntriesReply(AppendEntriesReply),
    InstallSnapshotCast(InstallSnapshotCast),
}
impl Message {
    /// メッセージのヘッダを返す.
    pub fn header(&self) -> &MessageHeader {
        match self {
            Message::RequestVoteCall(m) => &m.header,
            Message::RequestVoteReply(m) => &m.header,
            Message::AppendEntriesCall(m) => &m.header,
            Message::AppendEntriesReply(m) => &m.header,
            Message::InstallSnapshotCast(m) => &m.header,
        }
    }

    pub(crate) fn set_destination(&mut self, dst: &NodeId) {
        match self {
            Message::RequestVoteCall(m) => {
                m.header.destination = dst.clone();
            }
            Message::RequestVoteReply(m) => {
                m.header.destination = dst.clone();
            }
            Message::AppendEntriesCall(m) => {
                m.header.destination = dst.clone();
            }
            Message::AppendEntriesReply(m) => {
                m.header.destination = dst.clone();
            }
            Message::InstallSnapshotCast(m) => {
                m.header.destination = dst.clone();
            }
        }
    }
}
impl From<RequestVoteCall> for Message {
    fn from(f: RequestVoteCall) -> Self {
        Message::RequestVoteCall(f)
    }
}
impl From<RequestVoteReply> for Message {
    fn from(f: RequestVoteReply) -> Self {
        Message::RequestVoteReply(f)
    }
}
impl From<AppendEntriesCall> for Message {
    fn from(f: AppendEntriesCall) -> Self {
        Message::AppendEntriesCall(f)
    }
}
impl From<AppendEntriesReply> for Message {
    fn from(f: AppendEntriesReply) -> Self {
        Message::AppendEntriesReply(f)
    }
}
impl From<InstallSnapshotCast> for Message {
    fn from(f: InstallSnapshotCast) -> Self {
        Message::InstallSnapshotCast(f)
    }
}

/// メッセージのヘッダ.
#[derive(Debug, Clone)]
pub struct MessageHeader {
    /// メッセージの送信元.
    pub sender: NodeId,

    // FIXME: ヘッダには含めないようにする
    /// メッセージの宛先
    pub destination: NodeId,

    /// シーケンス番号.
    pub seq_no: SequenceNumber,

    /// 送信者の現在の`Term`.
    pub term: Term,
}

/// `RequestVoteRPC`の要求メッセージ.
#[derive(Debug, Clone)]
pub struct RequestVoteCall {
    /// メッセージヘッダ.
    pub header: MessageHeader,

    /// 送信者のログの終端位置.
    pub log_tail: LogPosition,
}

/// `RequestVoteRPC`の応答メッセージ.
#[derive(Debug, Clone)]
pub struct RequestVoteReply {
    /// メッセージヘッダ.
    pub header: MessageHeader,

    /// 投票を行ったかどうか.
    pub voted: bool,
}

/// `AppendEntriesRPC`の要求メッセージ.
#[derive(Debug, Clone)]
pub struct AppendEntriesCall {
    /// メッセージヘッダ.
    pub header: MessageHeader,

    /// コミット済みログの終端インデックス.
    pub committed_log_tail: LogIndex,

    /// 追記対象となるログの末尾部分.
    pub suffix: LogSuffix,
}

/// `AppendEntriesRPC`の応答メッセージ.
#[derive(Debug, Clone)]
pub struct AppendEntriesReply {
    /// メッセージヘッダ.
    pub header: MessageHeader,

    /// 応答者(follower)のログの終端位置.
    ///
    /// これは「実際のログの終端」というよりは、
    /// 「リーダに次に送って貰いたい末尾部分の開始位置」的な意味合いを有する.
    ///
    /// それを考慮すると`next_head`といった名前の方が適切かもしれない.
    pub log_tail: LogPosition,

    /// 応答者が忙しいかどうか.
    ///
    /// この値が`true`の場合には、
    /// followerの`log_tail`が遅れていたとしても、
    /// リーダはログの同期のための追加のメッセージ送信を行わない.
    pub busy: bool,
}

/// `InstallSnapshotRPC`用のメッセージ.
///
/// 論文中では、これも他のRPC同様に"要求・応答"形式となっているが、
/// 他のRPCとは異なり、これに関しては本質的には応答は不要なので、
/// ここでは一方的な送信のみをサポートしている.
#[derive(Debug, Clone)]
pub struct InstallSnapshotCast {
    /// メッセージヘッダ.
    pub header: MessageHeader,

    /// 保存対象となるログの前半部分(i.e., スナップショット).
    pub prefix: LogPrefix,
}

/// メッセージのシーケンス番号.
///
/// この番号はノード毎に管理され、要求系のメッセージ送信の度にインクリメントされる.
/// 応答系のメッセージでは、対応する要求メッセージのシーケンス番号が使用される.
///
/// シーケンス番号は、一つの`Term`内では単調増加することが保証されている.
/// 逆に言えば、複数の`Term`を跨いだ場合には、シーケンス番号が増加する保証は無い.
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct SequenceNumber(u64);
impl SequenceNumber {
    /// 新しい`SequenceNumber`インスタンスを生成する.
    pub fn new(num: u64) -> Self {
        SequenceNumber(num)
    }

    /// シーケンス番号の値を返す.
    pub fn as_u64(self) -> u64 {
        self.0
    }
}