Skip to main content

raftlog/node_state/follower/
mod.rs

1use self::append::FollowerAppend;
2use self::idle::FollowerIdle;
3use self::init::FollowerInit;
4use self::snapshot::FollowerSnapshot;
5use super::{Common, NextState};
6use election::Role;
7use message::{Message, MessageHeader};
8use {Io, Result};
9
10mod append;
11mod idle;
12mod init;
13mod snapshot;
14
15/// 別の人(ノード)に投票しているフォロワー.
16///
17/// リーダーから送られてきたメッセージを処理して、ログの同期を行う.
18///
19/// タイムアウト時間内にリーダからメッセージを受信しなかった場合には、
20/// その選挙期間は完了したものと判断して、自身が立候補して次の選挙を始める.
21pub enum Follower<IO: Io> {
22    /// 初期化状態 (主に投票状況の保存を行う).
23    Init(FollowerInit<IO>),
24
25    /// リーダからのメッセージ処理が可能な状態.
26    Idle(FollowerIdle<IO>),
27
28    /// ローカルログへの追記中.
29    Append(FollowerAppend<IO>),
30
31    /// ローカルログへのスナップショット保存中.
32    Snapshot(FollowerSnapshot<IO>),
33}
34impl<IO: Io> Follower<IO> {
35    pub fn new(common: &mut Common<IO>, pending_vote: Option<MessageHeader>) -> Self {
36        common.set_timeout(Role::Follower);
37        let follower = FollowerInit::new(common, pending_vote);
38        Follower::Init(follower)
39    }
40    pub fn handle_timeout(&mut self, common: &mut Common<IO>) -> Result<NextState<IO>> {
41        Ok(Some(common.transit_to_candidate()))
42    }
43    pub fn handle_message(
44        &mut self,
45        common: &mut Common<IO>,
46        message: Message,
47    ) -> Result<NextState<IO>> {
48        if let Message::AppendEntriesCall { .. } = message {
49            common.set_timeout(Role::Follower);
50            if unsafe { common.io_mut().is_busy() } {
51                common.rpc_callee(message.header()).reply_busy();
52                return Ok(None);
53            }
54        }
55
56        match *self {
57            Follower::Init(ref mut t) => track!(t.handle_message(common, message)),
58            Follower::Idle(ref mut t) => track!(t.handle_message(common, message)),
59            Follower::Append(ref mut t) => track!(t.handle_message(common, message)),
60            Follower::Snapshot(ref mut t) => track!(t.handle_message(common, message)),
61        }
62    }
63    pub fn run_once(&mut self, common: &mut Common<IO>) -> Result<NextState<IO>> {
64        match *self {
65            Follower::Init(ref mut t) => track!(t.run_once(common)),
66            Follower::Idle(_) => Ok(None),
67            Follower::Append(ref mut t) => track!(t.run_once(common)),
68            Follower::Snapshot(ref mut t) => track!(t.run_once(common)),
69        }
70    }
71}