openraft/engine/
command.rs

1use std::fmt::Debug;
2
3use crate::async_runtime::AsyncOneshotSendExt;
4use crate::core::sm;
5use crate::engine::CommandKind;
6use crate::error::Infallible;
7use crate::error::InitializeError;
8use crate::error::InstallSnapshotError;
9use crate::progress::entry::ProgressEntry;
10use crate::progress::Inflight;
11use crate::raft::AppendEntriesResponse;
12use crate::raft::InstallSnapshotResponse;
13use crate::raft::SnapshotResponse;
14use crate::raft::VoteRequest;
15use crate::raft::VoteResponse;
16use crate::type_config::alias::OneshotSenderOf;
17use crate::LeaderId;
18use crate::LogId;
19use crate::NodeId;
20use crate::OptionalSend;
21use crate::RaftTypeConfig;
22use crate::Vote;
23
24/// Commands to send to `RaftRuntime` to execute, to update the application state.
25#[derive(Debug)]
26pub(crate) enum Command<C>
27where C: RaftTypeConfig
28{
29    /// Becomes a leader, i.e., its `vote` is granted by a quorum.
30    /// The runtime initializes leader data when receives this command.
31    BecomeLeader,
32
33    /// No longer a leader. Clean up leader's data.
34    QuitLeader,
35
36    /// Append a `range` of entries.
37    AppendInputEntries {
38        /// The vote of the leader that submits the entries to write.
39        ///
40        /// The leader could be a local leader that appends entries to the local log store,
41        /// or a remote leader that replicates entries to this follower.
42        ///
43        /// The leader id is used to generate a monotonic increasing IO id, such as: [`LogIOId`].
44        /// Where [`LogIOId`] is `(leader_id, log_id)`.
45        ///
46        /// [`LogIOId`]: crate::raft_state::io_state::log_io_id::LogIOId
47        vote: Vote<C::NodeId>,
48
49        entries: Vec<C::Entry>,
50    },
51
52    /// Replicate the committed log id to other nodes
53    ReplicateCommitted { committed: Option<LogId<C::NodeId>> },
54
55    /// Commit log entries that are already persisted in the store, upto `upto`, inclusive.
56    ///
57    /// To `commit` logs, [`RaftLogStorage::save_committed()`] is called. And then committed logs
58    /// will be applied to the state machine by calling [`RaftStateMachine::apply()`].
59    ///
60    /// And if it is leader, send applied result to the client that proposed the entry.
61    ///
62    /// [`RaftLogStorage::save_committed()`]: crate::storage::RaftLogStorage::save_committed
63    /// [`RaftStateMachine::apply()`]: crate::storage::RaftStateMachine::apply
64    Commit {
65        // TODO: remove it when sm::Command to apply is generated by Engine
66        seq: sm::CommandSeq,
67        // TODO: pass the log id list or entries?
68        already_committed: Option<LogId<C::NodeId>>,
69        upto: LogId<C::NodeId>,
70    },
71
72    /// Replicate log entries or snapshot to a target.
73    Replicate {
74        target: C::NodeId,
75        req: Inflight<C::NodeId>,
76    },
77
78    /// Membership config changed, need to update replication streams.
79    /// The Runtime has to close all old replications and start new ones.
80    /// Because a replication stream should only report state for one membership config.
81    /// When membership config changes, the membership log id stored in ReplicationCore has to be
82    /// updated.
83    RebuildReplicationStreams {
84        /// Targets to replicate to.
85        targets: Vec<(C::NodeId, ProgressEntry<C::NodeId>)>,
86    },
87
88    /// Save vote to storage
89    SaveVote { vote: Vote<C::NodeId> },
90
91    /// Send vote to all other members
92    SendVote { vote_req: VoteRequest<C::NodeId> },
93
94    /// Purge log from the beginning to `upto`, inclusive.
95    PurgeLog { upto: LogId<C::NodeId> },
96
97    /// Delete logs that conflict with the leader from a follower/learner since log id `since`,
98    /// inclusive.
99    DeleteConflictLog { since: LogId<C::NodeId> },
100
101    // TODO(1): current it is only used to replace BuildSnapshot, InstallSnapshot, CancelSnapshot.
102    /// A command send to state machine worker [`sm::worker::Worker`].
103    ///
104    /// The runtime(`RaftCore`) will just forward this command to [`sm::worker::Worker`].
105    /// The response will be sent back in a `RaftMsg::StateMachine` message to `RaftCore`.
106    StateMachine { command: sm::Command<C> },
107
108    /// Send result to caller
109    Respond {
110        when: Option<Condition<C::NodeId>>,
111        resp: Respond<C>,
112    },
113}
114
115impl<C> From<sm::Command<C>> for Command<C>
116where C: RaftTypeConfig
117{
118    fn from(cmd: sm::Command<C>) -> Self {
119        Self::StateMachine { command: cmd }
120    }
121}
122
123/// For unit testing
124impl<C> PartialEq for Command<C>
125where
126    C: RaftTypeConfig,
127    C::Entry: PartialEq,
128{
129    #[rustfmt::skip]
130    fn eq(&self, other: &Self) -> bool {
131        match (self, other) {
132            (Command::BecomeLeader,                            Command::BecomeLeader)                                                          => true,
133            (Command::QuitLeader,                              Command::QuitLeader)                                                            => true,
134            (Command::AppendInputEntries { vote, entries },    Command::AppendInputEntries { vote: vb, entries: b }, )                         => vote == vb && entries == b,
135            (Command::ReplicateCommitted { committed },        Command::ReplicateCommitted { committed: b }, )                                 => committed == b,
136            (Command::Commit { seq, already_committed, upto, }, Command::Commit { seq: b_seq, already_committed: b_committed, upto: b_upto, }, ) => seq == b_seq && already_committed == b_committed && upto == b_upto,
137            (Command::Replicate { target, req },               Command::Replicate { target: b_target, req: other_req, }, )                     => target == b_target && req == other_req,
138            (Command::RebuildReplicationStreams { targets },   Command::RebuildReplicationStreams { targets: b }, )                            => targets == b,
139            (Command::SaveVote { vote },                       Command::SaveVote { vote: b })                                                  => vote == b,
140            (Command::SendVote { vote_req },                   Command::SendVote { vote_req: b }, )                                            => vote_req == b,
141            (Command::PurgeLog { upto },                       Command::PurgeLog { upto: b })                                                  => upto == b,
142            (Command::DeleteConflictLog { since },             Command::DeleteConflictLog { since: b }, )                                      => since == b,
143            (Command::Respond { when, resp: send },            Command::Respond { when: b_when, resp: b })                                     => send == b && when == b_when,
144            (Command::StateMachine { command },                Command::StateMachine { command: b })                                           => command == b,
145            _ => false,
146        }
147    }
148}
149
150impl<C> Command<C>
151where C: RaftTypeConfig
152{
153    #[allow(dead_code)]
154    #[rustfmt::skip]
155    pub(crate) fn kind(&self) -> CommandKind {
156        match self {
157            Command::BecomeLeader                     => CommandKind::Main,
158            Command::QuitLeader                       => CommandKind::Main,
159            Command::RebuildReplicationStreams { .. } => CommandKind::Main,
160            Command::Respond { .. }                   => CommandKind::Main,
161
162            Command::AppendInputEntries { .. }        => CommandKind::Log,
163            Command::SaveVote { .. }                  => CommandKind::Log,
164            Command::PurgeLog { .. }                  => CommandKind::Log,
165            Command::DeleteConflictLog { .. }         => CommandKind::Log,
166
167            Command::ReplicateCommitted { .. }        => CommandKind::Network,
168            Command::Replicate { .. }                 => CommandKind::Network,
169            Command::SendVote { .. }                  => CommandKind::Network,
170
171            Command::StateMachine { .. }              => CommandKind::StateMachine,
172            // Apply is firstly handled by RaftCore, then forwarded to state machine worker.
173            // TODO: Apply also write `committed` to log-store, which should be run in CommandKind::Log
174            Command::Commit { .. }                     => CommandKind::Main,
175        }
176    }
177
178    /// Return the condition the command waits for if any.
179    #[allow(dead_code)]
180    #[rustfmt::skip]
181    pub(crate) fn condition(&self) -> Option<&Condition<C::NodeId>> {
182        match self {
183            Command::BecomeLeader                     => None,
184            Command::QuitLeader                       => None,
185            Command::AppendInputEntries { .. }        => None,
186            Command::ReplicateCommitted { .. }        => None,
187            // TODO: Apply also write `committed` to log-store, which should be run in CommandKind::Log
188            Command::Commit { .. }                     => None,
189            Command::Replicate { .. }                 => None,
190            Command::RebuildReplicationStreams { .. } => None,
191            Command::SaveVote { .. }                  => None,
192            Command::SendVote { .. }                  => None,
193            Command::PurgeLog { .. }                  => None,
194            Command::DeleteConflictLog { .. }         => None,
195            Command::Respond { when, .. }             => when.as_ref(),
196            // TODO(1): 
197            Command::StateMachine { .. }              => None,
198        }
199    }
200}
201
202/// A condition to wait for before running a command.
203#[derive(Debug, Clone, Copy)]
204#[derive(PartialEq, Eq)]
205pub(crate) enum Condition<NID>
206where NID: NodeId
207{
208    /// Wait until the log is flushed to the disk.
209    ///
210    /// In raft, a log io can be uniquely identified by `(leader_id, log_id)`, not `log_id`.
211    /// A same log id can be written multiple times by different leaders.
212    #[allow(dead_code)]
213    LogFlushed {
214        leader: LeaderId<NID>,
215        log_id: Option<LogId<NID>>,
216    },
217
218    /// Wait until the log is applied to the state machine.
219    #[allow(dead_code)]
220    Applied { log_id: Option<LogId<NID>> },
221
222    /// Wait until a [`sm::worker::Worker`] command is finished.
223    #[allow(dead_code)]
224    StateMachineCommand { command_seq: sm::CommandSeq },
225}
226
227/// A command to send return value to the caller via a `oneshot::Sender`.
228#[derive(Debug, PartialEq, Eq)]
229#[derive(derive_more::From)]
230pub(crate) enum Respond<C>
231where C: RaftTypeConfig
232{
233    Vote(ValueSender<C, Result<VoteResponse<C::NodeId>, Infallible>>),
234    AppendEntries(ValueSender<C, Result<AppendEntriesResponse<C::NodeId>, Infallible>>),
235    ReceiveSnapshotChunk(ValueSender<C, Result<(), InstallSnapshotError>>),
236    InstallSnapshot(ValueSender<C, Result<InstallSnapshotResponse<C::NodeId>, InstallSnapshotError>>),
237    InstallFullSnapshot(ValueSender<C, Result<SnapshotResponse<C::NodeId>, Infallible>>),
238    Initialize(ValueSender<C, Result<(), InitializeError<C::NodeId, C::Node>>>),
239}
240
241impl<C> Respond<C>
242where C: RaftTypeConfig
243{
244    pub(crate) fn new<T>(res: T, tx: OneshotSenderOf<C, T>) -> Self
245    where
246        T: Debug + PartialEq + Eq + OptionalSend,
247        Self: From<ValueSender<C, T>>,
248    {
249        Respond::from(ValueSender::new(res, tx))
250    }
251
252    pub(crate) fn send(self) {
253        match self {
254            Respond::Vote(x) => x.send(),
255            Respond::AppendEntries(x) => x.send(),
256            Respond::ReceiveSnapshotChunk(x) => x.send(),
257            Respond::InstallSnapshot(x) => x.send(),
258            Respond::InstallFullSnapshot(x) => x.send(),
259            Respond::Initialize(x) => x.send(),
260        }
261    }
262}
263
264#[derive(Debug)]
265pub(crate) struct ValueSender<C, T>
266where
267    T: Debug + PartialEq + Eq + OptionalSend,
268    C: RaftTypeConfig,
269{
270    value: T,
271    tx: OneshotSenderOf<C, T>,
272}
273
274impl<C, T> PartialEq for ValueSender<C, T>
275where
276    T: Debug + PartialEq + Eq + OptionalSend,
277    C: RaftTypeConfig,
278{
279    fn eq(&self, other: &Self) -> bool {
280        self.value == other.value
281    }
282}
283
284impl<C, T> Eq for ValueSender<C, T>
285where
286    T: Debug + PartialEq + Eq + OptionalSend,
287    C: RaftTypeConfig,
288{
289}
290
291impl<C, T> ValueSender<C, T>
292where
293    T: Debug + PartialEq + Eq + OptionalSend,
294    C: RaftTypeConfig,
295{
296    pub(crate) fn new(res: T, tx: OneshotSenderOf<C, T>) -> Self {
297        Self { value: res, tx }
298    }
299
300    pub(crate) fn send(self) {
301        let _ = self.tx.send(self.value);
302    }
303}