openraft/core/
raft_core.rs

1use std::borrow::Borrow;
2use std::collections::BTreeMap;
3use std::fmt::Debug;
4use std::fmt::Display;
5use std::fmt::Formatter;
6use std::marker::PhantomData;
7use std::sync::atomic::Ordering;
8use std::sync::Arc;
9use std::time::Duration;
10
11use anyerror::AnyError;
12use futures::stream::FuturesUnordered;
13use futures::StreamExt;
14use futures::TryFutureExt;
15use maplit::btreeset;
16use tokio::select;
17use tokio::sync::mpsc;
18use tokio::sync::watch;
19use tracing::Instrument;
20use tracing::Level;
21use tracing::Span;
22
23use crate::async_runtime::AsyncOneshotSendExt;
24use crate::config::Config;
25use crate::config::RuntimeConfig;
26use crate::core::balancer::Balancer;
27use crate::core::command_state::CommandState;
28use crate::core::notify::Notify;
29use crate::core::raft_msg::external_command::ExternalCommand;
30use crate::core::raft_msg::AppendEntriesTx;
31use crate::core::raft_msg::ClientReadTx;
32use crate::core::raft_msg::RaftMsg;
33use crate::core::raft_msg::ResultSender;
34use crate::core::raft_msg::VoteTx;
35use crate::core::sm;
36use crate::core::sm::handle;
37use crate::core::sm::CommandSeq;
38use crate::core::ServerState;
39use crate::display_ext::DisplayInstantExt;
40use crate::display_ext::DisplayOption;
41use crate::display_ext::DisplayOptionExt;
42use crate::display_ext::DisplaySlice;
43use crate::engine::Command;
44use crate::engine::Condition;
45use crate::engine::Engine;
46use crate::engine::Respond;
47use crate::entry::FromAppData;
48use crate::entry::RaftEntry;
49use crate::error::ClientWriteError;
50use crate::error::Fatal;
51use crate::error::ForwardToLeader;
52use crate::error::Infallible;
53use crate::error::InitializeError;
54use crate::error::QuorumNotEnough;
55use crate::error::RPCError;
56use crate::error::Timeout;
57use crate::log_id::LogIdOptionExt;
58use crate::log_id::RaftLogId;
59use crate::metrics::RaftDataMetrics;
60use crate::metrics::RaftMetrics;
61use crate::metrics::RaftServerMetrics;
62use crate::metrics::ReplicationMetrics;
63use crate::network::RPCOption;
64use crate::network::RPCTypes;
65use crate::network::RaftNetwork;
66use crate::network::RaftNetworkFactory;
67use crate::progress::entry::ProgressEntry;
68use crate::progress::Inflight;
69use crate::progress::Progress;
70use crate::quorum::QuorumSet;
71use crate::raft::responder::Responder;
72use crate::raft::AppendEntriesRequest;
73use crate::raft::AppendEntriesResponse;
74use crate::raft::ClientWriteResponse;
75use crate::raft::VoteRequest;
76use crate::raft::VoteResponse;
77use crate::raft_state::LogIOId;
78use crate::raft_state::LogStateReader;
79use crate::replication;
80use crate::replication::request::Replicate;
81use crate::replication::request_id::RequestId;
82use crate::replication::response::ReplicationResult;
83use crate::replication::ReplicationCore;
84use crate::replication::ReplicationHandle;
85use crate::replication::ReplicationSessionId;
86use crate::runtime::RaftRuntime;
87use crate::storage::LogFlushed;
88use crate::storage::RaftLogReaderExt;
89use crate::storage::RaftLogStorage;
90use crate::storage::RaftStateMachine;
91use crate::type_config::alias::InstantOf;
92use crate::type_config::alias::ResponderOf;
93use crate::type_config::TypeConfigExt;
94use crate::AsyncRuntime;
95use crate::ChangeMembers;
96use crate::Instant;
97use crate::LogId;
98use crate::Membership;
99use crate::MessageSummary;
100use crate::Node;
101use crate::NodeId;
102use crate::OptionalSend;
103use crate::RaftTypeConfig;
104use crate::StorageError;
105use crate::StorageIOError;
106use crate::Vote;
107
108/// A temp struct to hold the data for a node that is being applied.
109#[derive(Debug)]
110pub(crate) struct ApplyingEntry<NID: NodeId, N: Node> {
111    log_id: LogId<NID>,
112    membership: Option<Membership<NID, N>>,
113}
114
115impl<NID: NodeId, N: Node> ApplyingEntry<NID, N> {
116    pub(crate) fn new(log_id: LogId<NID>, membership: Option<Membership<NID, N>>) -> Self {
117        Self { log_id, membership }
118    }
119}
120
121/// The result of applying log entries to state machine.
122pub(crate) struct ApplyResult<C: RaftTypeConfig> {
123    pub(crate) since: u64,
124    pub(crate) end: u64,
125    pub(crate) last_applied: LogId<C::NodeId>,
126    pub(crate) applying_entries: Vec<ApplyingEntry<C::NodeId, C::Node>>,
127    pub(crate) apply_results: Vec<C::R>,
128}
129
130impl<C: RaftTypeConfig> Debug for ApplyResult<C> {
131    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
132        f.debug_struct("ApplyResult")
133            .field("since", &self.since)
134            .field("end", &self.end)
135            .field("last_applied", &self.last_applied)
136            .finish()
137    }
138}
139
140/// Data for a Leader.
141///
142/// It is created when RaftCore enters leader state, and will be dropped when it quits leader state.
143pub(crate) struct LeaderData<C: RaftTypeConfig> {
144    /// The time to send next heartbeat.
145    pub(crate) next_heartbeat: <C::AsyncRuntime as AsyncRuntime>::Instant,
146}
147
148impl<C: RaftTypeConfig> LeaderData<C> {
149    pub(crate) fn new() -> Self {
150        Self {
151            next_heartbeat: C::now(),
152        }
153    }
154}
155
156// TODO: remove SM
157/// The core type implementing the Raft protocol.
158pub struct RaftCore<C, N, LS, SM>
159where
160    C: RaftTypeConfig,
161    N: RaftNetworkFactory<C>,
162    LS: RaftLogStorage<C>,
163    SM: RaftStateMachine<C>,
164{
165    /// This node's ID.
166    pub(crate) id: C::NodeId,
167
168    /// This node's runtime config.
169    pub(crate) config: Arc<Config>,
170
171    pub(crate) runtime_config: Arc<RuntimeConfig>,
172
173    /// The `RaftNetworkFactory` implementation.
174    pub(crate) network: N,
175
176    /// The [`RaftLogStorage`] implementation.
177    pub(crate) log_store: LS,
178
179    /// A controlling handle to the [`RaftStateMachine`] worker.
180    pub(crate) sm_handle: handle::Handle<C>,
181
182    pub(crate) engine: Engine<C>,
183
184    /// Channels to send result back to client when logs are applied.
185    pub(crate) client_resp_channels: BTreeMap<u64, ResponderOf<C>>,
186
187    /// A mapping of node IDs the replication state of the target node.
188    pub(crate) replications: BTreeMap<C::NodeId, ReplicationHandle<C>>,
189
190    pub(crate) leader_data: Option<LeaderData<C>>,
191
192    #[allow(dead_code)]
193    pub(crate) tx_api: mpsc::UnboundedSender<RaftMsg<C>>,
194    pub(crate) rx_api: mpsc::UnboundedReceiver<RaftMsg<C>>,
195
196    /// A Sender to send callback by other components to [`RaftCore`], when an action is finished,
197    /// such as flushing log to disk, or applying log entries to state machine.
198    pub(crate) tx_notify: mpsc::UnboundedSender<Notify<C>>,
199
200    /// A Receiver to receive callback from other components.
201    pub(crate) rx_notify: mpsc::UnboundedReceiver<Notify<C>>,
202
203    pub(crate) tx_metrics: watch::Sender<RaftMetrics<C::NodeId, C::Node>>,
204    pub(crate) tx_data_metrics: watch::Sender<RaftDataMetrics<C::NodeId>>,
205    pub(crate) tx_server_metrics: watch::Sender<RaftServerMetrics<C::NodeId, C::Node>>,
206
207    pub(crate) command_state: CommandState,
208
209    pub(crate) span: Span,
210
211    pub(crate) _p: PhantomData<SM>,
212}
213
214impl<C, N, LS, SM> RaftCore<C, N, LS, SM>
215where
216    C: RaftTypeConfig,
217    N: RaftNetworkFactory<C>,
218    LS: RaftLogStorage<C>,
219    SM: RaftStateMachine<C>,
220{
221    /// The main loop of the Raft protocol.
222    pub(crate) async fn main(
223        mut self,
224        rx_shutdown: <C::AsyncRuntime as AsyncRuntime>::OneshotReceiver<()>,
225    ) -> Result<Infallible, Fatal<C::NodeId>> {
226        let span = tracing::span!(parent: &self.span, Level::DEBUG, "main");
227        let res = self.do_main(rx_shutdown).instrument(span).await;
228
229        // Flush buffered metrics
230        self.report_metrics(None);
231
232        // Safe unwrap: res is Result<Infallible, _>
233        let err = res.unwrap_err();
234        match err {
235            Fatal::Stopped => { /* Normal quit */ }
236            _ => {
237                tracing::error!(error = display(&err), "quit RaftCore::main on error");
238            }
239        }
240
241        tracing::debug!("update the metrics for shutdown");
242        {
243            let mut curr = self.tx_metrics.borrow().clone();
244            curr.state = ServerState::Shutdown;
245            curr.running_state = Err(err.clone());
246
247            let _ = self.tx_metrics.send(curr);
248        }
249
250        tracing::info!("RaftCore shutdown complete");
251
252        Err(err)
253    }
254
255    #[tracing::instrument(level="trace", skip_all, fields(id=display(&self.id), cluster=%self.config.cluster_name))]
256    async fn do_main(
257        &mut self,
258        rx_shutdown: <C::AsyncRuntime as AsyncRuntime>::OneshotReceiver<()>,
259    ) -> Result<Infallible, Fatal<C::NodeId>> {
260        tracing::debug!("raft node is initializing");
261
262        self.engine.startup();
263        // It may not finish running all of the commands, if there is a command waiting for a callback.
264        self.run_engine_commands().await?;
265
266        // Initialize metrics.
267        self.report_metrics(None);
268
269        self.runtime_loop(rx_shutdown).await
270    }
271
272    /// Handle `is_leader` requests.
273    ///
274    /// Send heartbeat to all voters. We respond once we have
275    /// a quorum of agreement.
276    ///
277    /// Why:
278    /// To ensure linearizability, a read request proposed at time `T1` confirms this node's
279    /// leadership to guarantee that all the committed entries proposed before `T1` are present in
280    /// this node.
281    // TODO: the second condition is such a read request can only read from state machine only when the last log it sees
282    //       at `T1` is committed.
283    #[tracing::instrument(level = "trace", skip(self, tx))]
284    pub(super) async fn handle_check_is_leader_request(&mut self, tx: ClientReadTx<C>) {
285        // Setup sentinel values to track when we've received majority confirmation of leadership.
286
287        let resp = {
288            let l = self.engine.leader_handler();
289            let lh = match l {
290                Ok(leading_handler) => leading_handler,
291                Err(forward) => {
292                    let _ = tx.send(Err(forward.into()));
293                    return;
294                }
295            };
296
297            let read_log_id = lh.get_read_log_id();
298
299            // TODO: this applied is a little stale when being returned to client.
300            //       Fix this when the following heartbeats are replaced with calling RaftNetwork.
301            let applied = self.engine.state.io_applied().cloned();
302
303            (read_log_id, applied)
304        };
305
306        let my_id = self.id.clone();
307        let my_vote = self.engine.state.vote_ref().clone();
308        let ttl = Duration::from_millis(self.config.heartbeat_interval);
309        let eff_mem = self.engine.state.membership_state.effective().clone();
310        let core_tx = self.tx_notify.clone();
311
312        let mut granted = btreeset! {my_id.clone()};
313
314        if eff_mem.is_quorum(granted.iter()) {
315            let _ = tx.send(Ok(resp));
316            return;
317        }
318
319        // Spawn parallel requests, all with the standard timeout for heartbeats.
320        let mut pending = FuturesUnordered::new();
321
322        let voter_progresses = {
323            let l = &self.engine.leader.as_ref().unwrap();
324            l.progress.iter().filter(|(id, _v)| l.progress.is_voter(id) == Some(true))
325        };
326
327        for (target, progress) in voter_progresses {
328            let target = target.clone();
329
330            if target == my_id {
331                continue;
332            }
333
334            let rpc = AppendEntriesRequest {
335                vote: my_vote.clone(),
336                prev_log_id: progress.matching.clone(),
337                entries: vec![],
338                leader_commit: self.engine.state.committed().cloned(),
339            };
340
341            // Safe unwrap(): target is in membership
342            let target_node = eff_mem.get_node(&target).unwrap().clone();
343            let mut client = self.network.new_client(target.clone(), &target_node).await;
344
345            let option = RPCOption::new(ttl);
346
347            let fu = {
348                let my_id = my_id.clone();
349                let target = target.clone();
350                async move {
351                    let outer_res = C::AsyncRuntime::timeout(ttl, client.append_entries(rpc, option)).await;
352                    match outer_res {
353                        Ok(append_res) => match append_res {
354                            Ok(x) => Ok((target.clone(), x)),
355                            Err(err) => Err((target.clone(), err)),
356                        },
357                        Err(_timeout) => {
358                            let timeout_err = Timeout {
359                                action: RPCTypes::AppendEntries,
360                                id: my_id,
361                                target: target.clone(),
362                                timeout: ttl,
363                            };
364
365                            Err((target, RPCError::Timeout(timeout_err)))
366                        }
367                    }
368                }
369            };
370
371            let fu = fu.instrument(tracing::debug_span!("spawn_is_leader", target = target.to_string()));
372            let task = C::AsyncRuntime::spawn(fu).map_err(move |err| (target, err));
373
374            pending.push(task);
375        }
376
377        let waiting_fu = async move {
378            // Handle responses as they return.
379            while let Some(res) = pending.next().await {
380                let (target, append_res) = match res {
381                    Ok(Ok(res)) => res,
382                    Ok(Err((target, err))) => {
383                        tracing::error!(target=display(&target), error=%err, "timeout while confirming leadership for read request");
384                        continue;
385                    }
386                    Err((target, err)) => {
387                        tracing::error!(target = display(&target), "fail to join task: {}", err);
388                        continue;
389                    }
390                };
391
392                // If we receive a response with a greater vote, then revert to follower and abort this
393                // request.
394                if let AppendEntriesResponse::HigherVote(vote) = append_res {
395                    debug_assert!(
396                        vote > my_vote,
397                        "committed vote({}) has total order relation with other votes({})",
398                        my_vote,
399                        vote
400                    );
401
402                    let send_res = core_tx.send(Notify::HigherVote {
403                        target,
404                        higher: vote,
405                        sender_vote: my_vote,
406                    });
407
408                    if let Err(_e) = send_res {
409                        tracing::error!("fail to send HigherVote to RaftCore");
410                    }
411
412                    // we are no longer leader so error out early
413                    let err = ForwardToLeader::empty();
414                    let _ = tx.send(Err(err.into()));
415                    return;
416                }
417
418                granted.insert(target);
419
420                if eff_mem.is_quorum(granted.iter()) {
421                    let _ = tx.send(Ok(resp));
422                    return;
423                }
424            }
425
426            // If we've hit this location, then we've failed to gather needed confirmations due to
427            // request failures.
428
429            let _ = tx.send(Err(QuorumNotEnough {
430                cluster: eff_mem.membership().summary(),
431                got: granted,
432            }
433            .into()));
434        };
435
436        // TODO: do not spawn, manage read requests with a queue by RaftCore
437
438        // False positive lint warning(`non-binding `let` on a future`): https://github.com/rust-lang/rust-clippy/issues/9932
439        #[allow(clippy::let_underscore_future)]
440        let _ = C::AsyncRuntime::spawn(waiting_fu.instrument(tracing::debug_span!("spawn_is_leader_waiting")));
441    }
442
443    /// Submit change-membership by writing a Membership log entry.
444    ///
445    /// If `retain` is `true`, removed `voter` will becomes `learner`. Otherwise they will
446    /// be just removed.
447    ///
448    /// Changing membership includes changing voters config or adding/removing learners:
449    ///
450    /// - To change voters config, it will build a new **joint** config. If it already a joint
451    ///   config, it returns the final uniform config.
452    /// - Adding a learner does not affect election, thus it does not need to enter joint consensus.
453    ///   But it still has to wait for the previous membership to commit. Otherwise a second
454    ///   proposed membership implies the previous one is committed.
455    // ---
456    // TODO: This limit can be removed if membership_state is replaced by a list of membership logs.
457    //       Because allowing this requires the engine to be able to store more than 2
458    //       membership logs. And it does not need to wait for the previous membership log to commit
459    //       to propose the new membership log.
460    #[tracing::instrument(level = "debug", skip(self, tx))]
461    pub(super) fn change_membership(
462        &mut self,
463        changes: ChangeMembers<C::NodeId, C::Node>,
464        retain: bool,
465        tx: ResponderOf<C>,
466    ) {
467        let res = self.engine.state.membership_state.change_handler().apply(changes, retain);
468        let new_membership = match res {
469            Ok(x) => x,
470            Err(e) => {
471                tx.send(Err(ClientWriteError::ChangeMembershipError(e)));
472                return;
473            }
474        };
475
476        let ent = C::Entry::new_membership(LogId::default(), new_membership);
477        self.write_entry(ent, Some(tx));
478    }
479
480    /// Write a log entry to the cluster through raft protocol.
481    ///
482    /// I.e.: append the log entry to local store, forward it to a quorum(including the leader),
483    /// waiting for it to be committed and applied.
484    ///
485    /// The result of applying it to state machine is sent to `resp_tx`, if it is not `None`.
486    /// The calling side may not receive a result from `resp_tx`, if raft is shut down.
487    #[tracing::instrument(level = "debug", skip_all, fields(id = display(&self.id)))]
488    pub fn write_entry(&mut self, entry: C::Entry, resp_tx: Option<ResponderOf<C>>) -> bool {
489        tracing::debug!(payload = display(&entry), "write_entry");
490
491        let (mut lh, tx) = if let Some((lh, tx)) = self.engine.get_leader_handler_or_reject(resp_tx) {
492            (lh, tx)
493        } else {
494            return false;
495        };
496
497        let entries = vec![entry];
498        // TODO: it should returns membership config error etc. currently this is done by the
499        //       caller.
500        lh.leader_append_entries(entries);
501        let index = lh.state.last_log_id().unwrap().index;
502
503        // Install callback channels.
504        if let Some(tx) = tx {
505            self.client_resp_channels.insert(index, tx);
506        }
507
508        true
509    }
510
511    /// Send a heartbeat message to every followers/learners.
512    ///
513    /// Currently heartbeat is a blank log
514    #[tracing::instrument(level = "debug", skip_all, fields(id = display(&self.id)))]
515    pub fn send_heartbeat(&mut self, emitter: impl Display) -> bool {
516        tracing::debug!(now = debug(C::now()), "send_heartbeat");
517
518        let mut lh = if let Some((lh, _)) = self.engine.get_leader_handler_or_reject(None) {
519            lh
520        } else {
521            tracing::debug!(now = debug(C::now()), "{} failed to send heartbeat", emitter);
522            return false;
523        };
524
525        lh.send_heartbeat();
526
527        tracing::debug!("{} triggered sending heartbeat", emitter);
528        true
529    }
530
531    #[tracing::instrument(level = "debug", skip_all)]
532    pub fn flush_metrics(&mut self) {
533        let leader_metrics = if let Some(leader) = self.engine.leader.as_ref() {
534            let prog = &leader.progress;
535            Some(
536                prog.iter()
537                    .map(|(id, p)| {
538                        (
539                            id.clone(),
540                            <ProgressEntry<<C as RaftTypeConfig>::NodeId> as Borrow<Option<LogId<C::NodeId>>>>::borrow(
541                                p,
542                            )
543                            .clone(),
544                        )
545                    })
546                    .collect(),
547            )
548        } else {
549            None
550        };
551        self.report_metrics(leader_metrics);
552    }
553
554    /// Report a metrics payload on the current state of the Raft node.
555    #[tracing::instrument(level = "debug", skip_all)]
556    pub(crate) fn report_metrics(&mut self, replication: Option<ReplicationMetrics<C::NodeId>>) {
557        let last_quorum_acked = self.last_quorum_acked_time();
558        let millis_since_quorum_ack = last_quorum_acked.map(|t| t.elapsed().as_millis() as u64);
559
560        let st = &self.engine.state;
561
562        let membership_config = st.membership_state.effective().stored_membership().clone();
563        let current_leader = self.current_leader();
564
565        let m = RaftMetrics {
566            running_state: Ok(()),
567            id: self.id.clone(),
568
569            // --- data ---
570            current_term: st.vote_ref().leader_id().get_term(),
571            vote: st.io_state().vote().clone(),
572            last_log_index: st.last_log_id().index(),
573            last_applied: st.io_applied().cloned(),
574            snapshot: st.io_snapshot_last_log_id().cloned(),
575            purged: st.io_purged().cloned(),
576
577            // --- cluster ---
578            state: st.server_state,
579            current_leader: current_leader.clone(),
580            millis_since_quorum_ack,
581            membership_config: membership_config.clone(),
582
583            // --- replication ---
584            replication: replication.clone(),
585        };
586
587        let data_metrics = RaftDataMetrics {
588            last_log: st.last_log_id().cloned(),
589            last_applied: st.io_applied().cloned(),
590            snapshot: st.io_snapshot_last_log_id().cloned(),
591            purged: st.io_purged().cloned(),
592            millis_since_quorum_ack,
593            replication,
594        };
595
596        let server_metrics = RaftServerMetrics {
597            id: self.id.clone(),
598            vote: st.io_state().vote().clone(),
599            state: st.server_state,
600            current_leader,
601            membership_config,
602        };
603
604        // Start to send metrics
605        // `RaftMetrics` is sent last, because `Wait` only examines `RaftMetrics`
606        // but not `RaftDataMetrics` and `RaftServerMetrics`.
607        // Thus if `RaftMetrics` change is perceived, the other two should have been updated.
608
609        self.tx_data_metrics.send_if_modified(|metrix| {
610            if data_metrics.ne(metrix) {
611                *metrix = data_metrics.clone();
612                return true;
613            }
614            false
615        });
616
617        self.tx_server_metrics.send_if_modified(|metrix| {
618            if server_metrics.ne(metrix) {
619                *metrix = server_metrics.clone();
620                return true;
621            }
622            false
623        });
624
625        tracing::debug!("report_metrics: {}", m.summary());
626        let res = self.tx_metrics.send(m);
627
628        if let Err(err) = res {
629            tracing::error!(error=%err, id=display(&self.id), "error reporting metrics");
630        }
631    }
632
633    /// Handle the admin command `initialize`.
634    ///
635    /// It is allowed to initialize only when `last_log_id.is_none()` and `vote==(0,0)`.
636    /// See: [Conditions for initialization][precondition]
637    ///
638    /// [precondition]: crate::docs::cluster_control::cluster_formation#preconditions-for-initialization
639    #[tracing::instrument(level = "debug", skip(self, tx))]
640    pub(crate) fn handle_initialize(
641        &mut self,
642        member_nodes: BTreeMap<C::NodeId, C::Node>,
643        tx: ResultSender<C, (), InitializeError<C::NodeId, C::Node>>,
644    ) {
645        tracing::debug!(member_nodes = debug(&member_nodes), "{}", func_name!());
646
647        let membership = Membership::from(member_nodes);
648
649        let entry = C::Entry::new_membership(LogId::default(), membership);
650        let res = self.engine.initialize(entry);
651        self.engine.output.push_command(Command::Respond {
652            when: None,
653            resp: Respond::new(res, tx),
654        });
655    }
656
657    /// Trigger a snapshot building(log compaction) job if there is no pending building job.
658    #[tracing::instrument(level = "debug", skip(self))]
659    pub(crate) fn trigger_snapshot(&mut self) {
660        tracing::debug!("{}", func_name!());
661        self.engine.snapshot_handler().trigger_snapshot();
662    }
663
664    #[tracing::instrument(level = "debug", skip(self))]
665    pub(crate) fn current_leader(&self) -> Option<C::NodeId> {
666        tracing::debug!(
667            self_id = display(&self.id),
668            vote = display(self.engine.state.vote_ref().summary()),
669            "get current_leader"
670        );
671
672        let vote = self.engine.state.vote_ref();
673
674        if !vote.is_committed() {
675            return None;
676        }
677
678        // Safe unwrap(): vote that is committed has to already have voted for some node.
679        let id = vote.leader_id().voted_for().unwrap();
680
681        // TODO: `is_voter()` is slow, maybe cache `current_leader`,
682        //       e.g., only update it when membership or vote changes
683        if self.engine.state.membership_state.effective().is_voter(&id) {
684            Some(id)
685        } else {
686            tracing::debug!("id={} is not a voter", id);
687            None
688        }
689    }
690
691    /// Retrieves the most recent timestamp that is acknowledged by a quorum.
692    ///
693    /// This function returns the latest known time at which the leader received acknowledgment
694    /// from a quorum of followers, indicating its leadership is current and recognized.
695    /// If the node is not a leader or no acknowledgment has been received, `None` is returned.
696    fn last_quorum_acked_time(&mut self) -> Option<InstantOf<C>> {
697        let leading = self.engine.leader.as_mut();
698        leading.and_then(|l| l.last_quorum_acked_time())
699    }
700
701    pub(crate) fn get_leader_node(&self, leader_id: Option<C::NodeId>) -> Option<C::Node> {
702        let leader_id = leader_id?;
703
704        self.engine.state.membership_state.effective().get_node(&leader_id).cloned()
705    }
706
707    /// A temp wrapper to make non-blocking `append_to_log` a blocking.
708    #[tracing::instrument(level = "debug", skip_all)]
709    pub(crate) async fn append_to_log<I>(
710        &mut self,
711        entries: I,
712        vote: Vote<C::NodeId>,
713        last_log_id: LogId<C::NodeId>,
714    ) -> Result<(), StorageError<C::NodeId>>
715    where
716        I: IntoIterator<Item = C::Entry> + OptionalSend,
717        I::IntoIter: OptionalSend,
718    {
719        tracing::debug!("append_to_log");
720
721        let (tx, rx) = C::AsyncRuntime::oneshot();
722        let log_io_id = LogIOId::new(vote, Some(last_log_id));
723
724        let callback = LogFlushed::new(log_io_id, tx);
725
726        self.log_store.append(entries, callback).await?;
727        rx.await
728            .map_err(|e| StorageIOError::write_logs(AnyError::error(e)))?
729            .map_err(|e| StorageIOError::write_logs(AnyError::error(e)))?;
730        Ok(())
731    }
732
733    #[tracing::instrument(level = "debug", skip_all)]
734    pub(crate) async fn apply_to_state_machine(
735        &mut self,
736        seq: CommandSeq,
737        since: u64,
738        upto_index: u64,
739    ) -> Result<(), StorageError<C::NodeId>> {
740        tracing::debug!(upto_index = display(upto_index), "{}", func_name!());
741
742        let end = upto_index + 1;
743
744        debug_assert!(
745            since <= end,
746            "last_applied index {} should <= committed index {}",
747            since,
748            end
749        );
750
751        if since == end {
752            return Ok(());
753        }
754
755        let entries = self.log_store.get_log_entries(since..end).await?;
756        tracing::debug!(
757            entries = display(DisplaySlice::<_>(entries.as_slice())),
758            "about to apply"
759        );
760
761        let last_applied = entries[entries.len() - 1].get_log_id().clone();
762
763        let cmd = sm::Command::apply(entries).with_seq(seq);
764        self.sm_handle.send(cmd).map_err(|e| StorageIOError::apply(last_applied, AnyError::error(e)))?;
765
766        Ok(())
767    }
768
769    /// When received results of applying log entries to the state machine, send back responses to
770    /// the callers that proposed the entries.
771    #[tracing::instrument(level = "debug", skip_all)]
772    pub(crate) fn handle_apply_result(&mut self, res: ApplyResult<C>) {
773        tracing::debug!(last_applied = display(res.last_applied), "{}", func_name!());
774
775        let mut results = res.apply_results.into_iter();
776        let mut applying_entries = res.applying_entries.into_iter();
777
778        for log_index in res.since..res.end {
779            let ent = applying_entries.next().unwrap();
780            let apply_res = results.next().unwrap();
781            let tx = self.client_resp_channels.remove(&log_index);
782
783            Self::send_response(ent, apply_res, tx);
784        }
785    }
786
787    /// Send result of applying a log entry to its client.
788    #[tracing::instrument(level = "debug", skip_all)]
789    pub(super) fn send_response(entry: ApplyingEntry<C::NodeId, C::Node>, resp: C::R, tx: Option<ResponderOf<C>>) {
790        tracing::debug!(entry = debug(&entry), "send_response");
791
792        let tx = match tx {
793            None => return,
794            Some(x) => x,
795        };
796
797        let membership = entry.membership;
798
799        let res = Ok(ClientWriteResponse {
800            log_id: entry.log_id,
801            data: resp,
802            membership,
803        });
804
805        tx.send(res);
806    }
807
808    /// Spawn a new replication stream returning its replication state handle.
809    #[tracing::instrument(level = "debug", skip(self))]
810    #[allow(clippy::type_complexity)]
811    pub(crate) async fn spawn_replication_stream(
812        &mut self,
813        target: C::NodeId,
814        progress_entry: ProgressEntry<C::NodeId>,
815    ) -> ReplicationHandle<C> {
816        // Safe unwrap(): target must be in membership
817        let target_node = self.engine.state.membership_state.effective().get_node(&target).unwrap();
818
819        let membership_log_id = self.engine.state.membership_state.effective().log_id();
820        let network = self.network.new_client(target.clone(), target_node).await;
821        let snapshot_network = self.network.new_client(target.clone(), target_node).await;
822
823        let leader = self.engine.leader.as_ref().unwrap();
824
825        let session_id = ReplicationSessionId::new(leader.vote.clone(), membership_log_id.clone());
826
827        ReplicationCore::<C, N, LS>::spawn(
828            target.clone(),
829            session_id,
830            self.config.clone(),
831            self.engine.state.committed().cloned(),
832            progress_entry.matching,
833            network,
834            snapshot_network,
835            self.log_store.get_log_reader().await,
836            self.sm_handle.new_snapshot_reader(),
837            self.tx_notify.clone(),
838            tracing::span!(parent: &self.span, Level::DEBUG, "replication", id=display(&self.id), target=display(&target)),
839        )
840    }
841
842    /// Remove all replication.
843    #[tracing::instrument(level = "debug", skip_all)]
844    pub async fn remove_all_replication(&mut self) {
845        tracing::info!("remove all replication");
846
847        let nodes = std::mem::take(&mut self.replications);
848
849        tracing::debug!(
850            targets = debug(nodes.iter().map(|x| x.0.clone()).collect::<Vec<_>>()),
851            "remove all targets from replication_metrics"
852        );
853
854        for (target, s) in nodes {
855            let handle = s.join_handle;
856
857            // Drop sender to notify the task to shutdown
858            drop(s.tx_repl);
859
860            tracing::debug!("joining removed replication: {}", target);
861            let _x = handle.await;
862            tracing::info!("Done joining removed replication : {}", target);
863        }
864    }
865
866    /// Run as many commands as possible.
867    ///
868    /// If there is a command that waits for a callback, just return and wait for
869    /// next RaftMsg.
870    #[tracing::instrument(level = "debug", skip_all)]
871    pub(crate) async fn run_engine_commands(&mut self) -> Result<(), StorageError<C::NodeId>> {
872        if tracing::enabled!(Level::DEBUG) {
873            tracing::debug!("queued commands: start...");
874            for c in self.engine.output.iter_commands() {
875                tracing::debug!("queued commands: {:?}", c);
876            }
877            tracing::debug!("queued commands: end...");
878        }
879
880        while let Some(cmd) = self.engine.output.pop_command() {
881            tracing::debug!("run command: {:?}", cmd);
882
883            let res = self.run_command(cmd).await?;
884
885            if let Some(cmd) = res {
886                tracing::debug!("early return: postpone command: {:?}", cmd);
887                self.engine.output.postpone_command(cmd);
888
889                if tracing::enabled!(Level::DEBUG) {
890                    for c in self.engine.output.iter_commands().take(8) {
891                        tracing::debug!("postponed, first 8 queued commands: {:?}", c);
892                    }
893                }
894
895                return Ok(());
896            }
897        }
898
899        Ok(())
900    }
901
902    /// Run an event handling loop
903    ///
904    /// It always returns a [`Fatal`] error upon returning.
905    #[tracing::instrument(level="debug", skip_all, fields(id=display(&self.id)))]
906    async fn runtime_loop(
907        &mut self,
908        mut rx_shutdown: <C::AsyncRuntime as AsyncRuntime>::OneshotReceiver<()>,
909    ) -> Result<Infallible, Fatal<C::NodeId>> {
910        // Ratio control the ratio of number of RaftMsg to process to number of Notify to process.
911        let mut balancer = Balancer::new(10_000);
912
913        loop {
914            self.flush_metrics();
915
916            // In each loop, it does not have to check rx_shutdown and flush metrics for every RaftMsg
917            // processed.
918            // In each loop, the first step is blocking waiting for any message from any channel.
919            // Then if there is any message, process as many as possible to maximize throughput.
920
921            select! {
922                // Check shutdown in each loop first so that a message flood in `tx_api` won't block shutting down.
923                // `select!` without `biased` provides a random fairness.
924                // We want to check shutdown prior to other channels.
925                // See: https://docs.rs/tokio/latest/tokio/macro.select.html#fairness
926                biased;
927
928                _ = &mut rx_shutdown => {
929                    tracing::info!("recv from rx_shutdown");
930                    return Err(Fatal::Stopped);
931                }
932
933                notify_res = self.rx_notify.recv() => {
934                    match notify_res {
935                        Some(notify) => self.handle_notify(notify)?,
936                        None => {
937                            tracing::error!("all rx_notify senders are dropped");
938                            return Err(Fatal::Stopped);
939                        }
940                    };
941                }
942
943                msg_res = self.rx_api.recv() => {
944                    match msg_res {
945                        Some(msg) => self.handle_api_msg(msg).await,
946                        None => {
947                            tracing::info!("all rx_api senders are dropped");
948                            return Err(Fatal::Stopped);
949                        }
950                    };
951                }
952            }
953
954            self.run_engine_commands().await?;
955
956            // There is a message waking up the loop, process channels one by one.
957
958            let raft_msg_processed = self.process_raft_msg(balancer.raft_msg()).await?;
959            let notify_processed = self.process_notify(balancer.notify()).await?;
960
961            // If one of the channel consumed all its budget, re-balance the budget ratio.
962
963            #[allow(clippy::collapsible_else_if)]
964            if notify_processed == balancer.notify() {
965                tracing::info!("there may be more Notify to process, increase Notify ratio");
966                balancer.increase_notify();
967            } else {
968                if raft_msg_processed == balancer.raft_msg() {
969                    tracing::info!("there may be more RaftMsg to process, increase RaftMsg ratio");
970                    balancer.increase_raft_msg();
971                }
972            }
973        }
974    }
975
976    /// Process RaftMsg as many as possible.
977    ///
978    /// It returns the number of processed message.
979    /// If the input channel is closed, it returns `Fatal::Stopped`.
980    async fn process_raft_msg(&mut self, at_most: u64) -> Result<u64, Fatal<C::NodeId>> {
981        for i in 0..at_most {
982            let res = self.rx_api.try_recv();
983            let msg = match res {
984                Ok(msg) => msg,
985                Err(e) => match e {
986                    mpsc::error::TryRecvError::Empty => {
987                        tracing::debug!("all RaftMsg are processed, wait for more");
988                        return Ok(i + 1);
989                    }
990                    mpsc::error::TryRecvError::Disconnected => {
991                        tracing::debug!("rx_api is disconnected, quit");
992                        return Err(Fatal::Stopped);
993                    }
994                },
995            };
996
997            self.handle_api_msg(msg).await;
998
999            // TODO: does run_engine_commands() run too frequently?
1000            //       to run many commands in one shot, it is possible to batch more commands to gain
1001            //       better performance.
1002
1003            self.run_engine_commands().await?;
1004        }
1005
1006        tracing::debug!("at_most({}) reached, there are more queued RaftMsg to process", at_most);
1007
1008        Ok(at_most)
1009    }
1010
1011    /// Process Notify as many as possible.
1012    ///
1013    /// It returns the number of processed notifications.
1014    /// If the input channel is closed, it returns `Fatal::Stopped`.
1015    async fn process_notify(&mut self, at_most: u64) -> Result<u64, Fatal<C::NodeId>> {
1016        for i in 0..at_most {
1017            let res = self.rx_notify.try_recv();
1018            let notify = match res {
1019                Ok(msg) => msg,
1020                Err(e) => match e {
1021                    mpsc::error::TryRecvError::Empty => {
1022                        tracing::debug!("all Notify are processed, wait for more");
1023                        return Ok(i + 1);
1024                    }
1025                    mpsc::error::TryRecvError::Disconnected => {
1026                        tracing::error!("rx_notify is disconnected, quit");
1027                        return Err(Fatal::Stopped);
1028                    }
1029                },
1030            };
1031
1032            self.handle_notify(notify)?;
1033
1034            // TODO: does run_engine_commands() run too frequently?
1035            //       to run many commands in one shot, it is possible to batch more commands to gain
1036            //       better performance.
1037
1038            self.run_engine_commands().await?;
1039        }
1040
1041        tracing::debug!("at_most({}) reached, there are more queued Notify to process", at_most);
1042
1043        Ok(at_most)
1044    }
1045
1046    /// Spawn parallel vote requests to all cluster members.
1047    #[tracing::instrument(level = "trace", skip_all, fields(vote=vote_req.summary()))]
1048    async fn spawn_parallel_vote_requests(&mut self, vote_req: &VoteRequest<C::NodeId>) {
1049        let members = self.engine.state.membership_state.effective().voter_ids();
1050
1051        let vote = vote_req.vote.clone();
1052
1053        for target in members {
1054            if target == self.id {
1055                continue;
1056            }
1057
1058            let req = vote_req.clone();
1059
1060            // Safe unwrap(): target must be in membership
1061            let target_node = self.engine.state.membership_state.effective().get_node(&target).unwrap().clone();
1062            let mut client = self.network.new_client(target.clone(), &target_node).await;
1063
1064            let tx = self.tx_notify.clone();
1065
1066            let ttl = Duration::from_millis(self.config.election_timeout_min);
1067            let id = self.id.clone();
1068            let option = RPCOption::new(ttl);
1069
1070            // False positive lint warning(`non-binding `let` on a future`): https://github.com/rust-lang/rust-clippy/issues/9932
1071            #[allow(clippy::let_underscore_future)]
1072            let _ = C::AsyncRuntime::spawn(
1073                {
1074                    let target = target.clone();
1075                    let vote = vote.clone();
1076
1077                    async move {
1078                        let tm_res = C::AsyncRuntime::timeout(ttl, client.vote(req, option)).await;
1079                        let res = match tm_res {
1080                            Ok(res) => res,
1081
1082                            Err(_timeout) => {
1083                                let timeout_err = Timeout {
1084                                    action: RPCTypes::Vote,
1085                                    id,
1086                                    target: target.clone(),
1087                                    timeout: ttl,
1088                                };
1089                                tracing::error!({error = %timeout_err, target = display(&target)}, "timeout");
1090                                return;
1091                            }
1092                        };
1093
1094                        match res {
1095                            Ok(resp) => {
1096                                let _ = tx.send(Notify::VoteResponse {
1097                                    target,
1098                                    resp,
1099                                    sender_vote: vote,
1100                                });
1101                            }
1102                            Err(err) => tracing::error!({error=%err, target=display(&target)}, "while requesting vote"),
1103                        }
1104                    }
1105                }
1106                .instrument(tracing::debug_span!(
1107                    parent: &Span::current(),
1108                    "send_vote_req",
1109                    target = display(&target)
1110                )),
1111            );
1112        }
1113    }
1114
1115    #[tracing::instrument(level = "debug", skip_all)]
1116    pub(super) fn handle_vote_request(&mut self, req: VoteRequest<C::NodeId>, tx: VoteTx<C>) {
1117        tracing::info!(req = display(req.summary()), func = func_name!());
1118
1119        let resp = self.engine.handle_vote_req(req);
1120        self.engine.output.push_command(Command::Respond {
1121            when: None,
1122            resp: Respond::new(Ok(resp), tx),
1123        });
1124    }
1125
1126    #[tracing::instrument(level = "debug", skip_all)]
1127    pub(super) fn handle_append_entries_request(&mut self, req: AppendEntriesRequest<C>, tx: AppendEntriesTx<C>) {
1128        tracing::debug!(req = display(req.summary()), func = func_name!());
1129
1130        let is_ok = self.engine.handle_append_entries(&req.vote, req.prev_log_id, req.entries, Some(tx));
1131
1132        if is_ok {
1133            self.engine.handle_commit_entries(req.leader_commit);
1134        }
1135    }
1136
1137    // TODO: Make this method non-async. It does not need to run any async command in it.
1138    #[tracing::instrument(level = "debug", skip(self, msg), fields(state = debug(self.engine.state.server_state), id=display(&self.id)))]
1139    pub(crate) async fn handle_api_msg(&mut self, msg: RaftMsg<C>) {
1140        tracing::debug!("recv from rx_api: {}", msg.summary());
1141
1142        match msg {
1143            RaftMsg::AppendEntries { rpc, tx } => {
1144                self.handle_append_entries_request(rpc, tx);
1145            }
1146            RaftMsg::RequestVote { rpc, tx } => {
1147                let now = C::now();
1148                tracing::info!(
1149                    now = display(now.display()),
1150                    vote_request = display(&rpc),
1151                    "received RaftMsg::RequestVote: {}",
1152                    func_name!()
1153                );
1154
1155                self.handle_vote_request(rpc, tx);
1156            }
1157            RaftMsg::BeginReceivingSnapshot { tx } => {
1158                self.engine.handle_begin_receiving_snapshot(tx);
1159            }
1160            RaftMsg::InstallFullSnapshot { vote, snapshot, tx } => {
1161                self.engine.handle_install_full_snapshot(vote, snapshot, tx);
1162            }
1163            RaftMsg::CheckIsLeaderRequest { tx } => {
1164                self.handle_check_is_leader_request(tx).await;
1165            }
1166            RaftMsg::ClientWriteRequest { app_data, tx } => {
1167                self.write_entry(C::Entry::from_app_data(app_data), Some(tx));
1168            }
1169            RaftMsg::Initialize { members, tx } => {
1170                tracing::info!(
1171                    members = debug(&members),
1172                    "received RaftMsg::Initialize: {}",
1173                    func_name!()
1174                );
1175
1176                self.handle_initialize(members, tx);
1177            }
1178            RaftMsg::ChangeMembership { changes, retain, tx } => {
1179                tracing::info!(
1180                    members = debug(&changes),
1181                    retain = debug(&retain),
1182                    "received RaftMsg::ChangeMembership: {}",
1183                    func_name!()
1184                );
1185
1186                self.change_membership(changes, retain, tx);
1187            }
1188            RaftMsg::ExternalCoreRequest { req } => {
1189                req(&self.engine.state);
1190            }
1191            RaftMsg::ExternalCommand { cmd } => {
1192                tracing::info!(cmd = debug(&cmd), "received RaftMsg::ExternalCommand: {}", func_name!());
1193
1194                match cmd {
1195                    ExternalCommand::Elect => {
1196                        if self.engine.state.membership_state.effective().is_voter(&self.id) {
1197                            // TODO: reject if it is already a leader?
1198                            self.engine.elect();
1199                            tracing::debug!("ExternalCommand: triggered election");
1200                        } else {
1201                            // Node is switched to learner.
1202                        }
1203                    }
1204                    ExternalCommand::Heartbeat => {
1205                        self.send_heartbeat("ExternalCommand");
1206                    }
1207                    ExternalCommand::Snapshot => self.trigger_snapshot(),
1208                    ExternalCommand::GetSnapshot { tx } => {
1209                        let cmd = sm::Command::get_snapshot(tx);
1210                        let res = self.sm_handle.send(cmd);
1211                        if let Err(e) = res {
1212                            tracing::error!(error = display(e), "error sending GetSnapshot to sm worker");
1213                        }
1214                    }
1215                    ExternalCommand::PurgeLog { upto } => {
1216                        self.engine.trigger_purge_log(upto);
1217                    }
1218                }
1219            }
1220        };
1221    }
1222
1223    // TODO: Make this method non-async. It does not need to run any async command in it.
1224    #[tracing::instrument(level = "debug", skip_all, fields(state = debug(self.engine.state.server_state), id=display(&self.id)))]
1225    pub(crate) fn handle_notify(&mut self, notify: Notify<C>) -> Result<(), Fatal<C::NodeId>> {
1226        tracing::debug!("recv from rx_notify: {}", notify.summary());
1227
1228        match notify {
1229            Notify::VoteResponse {
1230                target,
1231                resp,
1232                sender_vote,
1233            } => {
1234                let now = C::now();
1235
1236                tracing::info!(
1237                    now = display(now.display()),
1238                    resp = display(&resp),
1239                    "received Notify::VoteResponse: {}",
1240                    func_name!()
1241                );
1242
1243                if self.does_vote_match(&sender_vote, "VoteResponse") {
1244                    self.engine.handle_vote_resp(target, resp);
1245                }
1246            }
1247
1248            Notify::HigherVote {
1249                target,
1250                higher,
1251                sender_vote,
1252            } => {
1253                tracing::info!(
1254                    target = display(&target),
1255                    higher_vote = display(&higher),
1256                    sending_vote = display(&sender_vote),
1257                    "received Notify::HigherVote: {}",
1258                    func_name!()
1259                );
1260
1261                if self.does_vote_match(&sender_vote, "HigherVote") {
1262                    // Rejected vote change is ok.
1263                    let _ = self.engine.vote_handler().update_vote(&higher);
1264                }
1265            }
1266
1267            Notify::Tick { i } => {
1268                // check every timer
1269
1270                let now = C::now();
1271                tracing::debug!("received tick: {}, now: {:?}", i, now);
1272
1273                self.handle_tick_election();
1274
1275                // TODO: test: fixture: make isolated_nodes a single-way isolating.
1276
1277                // TODO: check if it is Leader with Engine
1278                // Leader send heartbeat
1279                let heartbeat_at = self.leader_data.as_ref().map(|x| x.next_heartbeat);
1280                if let Some(t) = heartbeat_at {
1281                    if now >= t {
1282                        if self.runtime_config.enable_heartbeat.load(Ordering::Relaxed) {
1283                            self.send_heartbeat("tick");
1284                        }
1285
1286                        // Install next heartbeat
1287                        if let Some(l) = &mut self.leader_data {
1288                            l.next_heartbeat = C::now() + Duration::from_millis(self.config.heartbeat_interval);
1289                        }
1290                    }
1291                }
1292
1293                // When a membership that removes the leader is committed,
1294                // the leader continue to work for a short while before reverting to a learner.
1295                // This way, let the leader replicate the `membership-log-is-committed` message to
1296                // followers.
1297                // Otherwise, if the leader step down at once, the follower might have to
1298                // re-commit the membership log again, electing itself.
1299                //
1300                // ---
1301                //
1302                // Stepping down only when the response of the second change-membership is sent.
1303                // Otherwise the Sender to the caller will be dropped before sending back the
1304                // response.
1305
1306                // TODO: temp solution: Manually wait until the second membership log being applied to state
1307                //       machine. Because the response is sent back to the caller after log is
1308                //       applied.
1309                //       ---
1310                //       A better way is to make leader step down a command that waits for the log to be applied.
1311                if self.engine.state.io_applied() >= self.engine.state.membership_state.effective().log_id().as_ref() {
1312                    self.engine.leader_step_down();
1313                }
1314            }
1315
1316            Notify::Network { response } => {
1317                //
1318                match response {
1319                    replication::Response::Progress {
1320                        target,
1321                        request_id: id,
1322                        result,
1323                        session_id,
1324                    } => {
1325                        // If vote or membership changes, ignore the message.
1326                        // There is chance delayed message reports a wrong state.
1327                        if self.does_replication_session_match(&session_id, "UpdateReplicationMatched") {
1328                            self.handle_replication_progress(target, id, result);
1329                        }
1330                    }
1331
1332                    replication::Response::StorageError { error } => {
1333                        tracing::error!(
1334                            error = display(&error),
1335                            "received Notify::ReplicationStorageError: {}",
1336                            func_name!()
1337                        );
1338
1339                        return Err(Fatal::from(error));
1340                    }
1341
1342                    replication::Response::HigherVote {
1343                        target,
1344                        higher,
1345                        sender_vote,
1346                    } => {
1347                        tracing::info!(
1348                            target = display(&target),
1349                            higher_vote = display(&higher),
1350                            sender_vote = display(&sender_vote),
1351                            "received Notify::HigherVote: {}",
1352                            func_name!()
1353                        );
1354
1355                        if self.does_vote_match(&sender_vote, "HigherVote") {
1356                            // Rejected vote change is ok.
1357                            let _ = self.engine.vote_handler().update_vote(&higher);
1358                        }
1359                    }
1360                }
1361            }
1362
1363            Notify::StateMachine { command_result } => {
1364                tracing::debug!("sm::StateMachine command result: {:?}", command_result);
1365
1366                let seq = command_result.command_seq;
1367                let res = command_result.result?;
1368
1369                match res {
1370                    // BuildSnapshot is a read operation that does not have to be serialized by
1371                    // sm::Worker. Thus it may finish out of order.
1372                    sm::Response::BuildSnapshot(_) => {}
1373                    _ => {
1374                        debug_assert!(
1375                            self.command_state.finished_sm_seq < seq,
1376                            "sm::StateMachine command result is out of order: expect {} < {}",
1377                            self.command_state.finished_sm_seq,
1378                            seq
1379                        );
1380                    }
1381                }
1382                self.command_state.finished_sm_seq = seq;
1383
1384                match res {
1385                    sm::Response::BuildSnapshot(meta) => {
1386                        tracing::info!(
1387                            "sm::StateMachine command done: BuildSnapshot: {}: {}",
1388                            meta.summary(),
1389                            func_name!()
1390                        );
1391
1392                        // Update in-memory state first, then the io state.
1393                        // In-memory state should always be ahead or equal to the io state.
1394
1395                        let last_log_id = meta.last_log_id.clone();
1396                        self.engine.finish_building_snapshot(meta);
1397
1398                        let st = self.engine.state.io_state_mut();
1399                        st.update_snapshot(last_log_id);
1400                    }
1401                    sm::Response::InstallSnapshot(meta) => {
1402                        tracing::info!(
1403                            "sm::StateMachine command done: InstallSnapshot: {}: {}",
1404                            meta.summary(),
1405                            func_name!()
1406                        );
1407
1408                        if let Some(meta) = meta {
1409                            let st = self.engine.state.io_state_mut();
1410                            st.update_applied(meta.last_log_id.clone());
1411                            st.update_snapshot(meta.last_log_id);
1412                        }
1413                    }
1414                    sm::Response::Apply(res) => {
1415                        self.engine.state.io_state_mut().update_applied(Some(res.last_applied.clone()));
1416
1417                        self.handle_apply_result(res);
1418                    }
1419                }
1420            }
1421        };
1422        Ok(())
1423    }
1424
1425    #[tracing::instrument(level = "debug", skip_all)]
1426    fn handle_tick_election(&mut self) {
1427        let now = C::now();
1428
1429        tracing::debug!("try to trigger election by tick, now: {:?}", now);
1430
1431        // TODO: leader lease should be extended. Or it has to examine if it is leader
1432        //       before electing.
1433        if self.engine.state.server_state == ServerState::Leader {
1434            tracing::debug!("already a leader, do not elect again");
1435            return;
1436        }
1437
1438        if !self.engine.state.membership_state.effective().is_voter(&self.id) {
1439            tracing::debug!("this node is not a voter");
1440            return;
1441        }
1442
1443        if !self.runtime_config.enable_elect.load(Ordering::Relaxed) {
1444            tracing::debug!("election is disabled");
1445            return;
1446        }
1447
1448        if self.engine.state.membership_state.effective().voter_ids().count() == 1 {
1449            tracing::debug!("this is the only voter, do election at once");
1450        } else {
1451            tracing::debug!("there are multiple voter, check election timeout");
1452
1453            let current_vote = self.engine.state.vote_ref();
1454            let utime = self.engine.state.vote_last_modified();
1455            let timer_config = &self.engine.config.timer_config;
1456
1457            let mut election_timeout = if current_vote.is_committed() {
1458                timer_config.leader_lease + timer_config.election_timeout
1459            } else {
1460                timer_config.election_timeout
1461            };
1462
1463            if self.engine.is_there_greater_log() {
1464                election_timeout += timer_config.smaller_log_timeout;
1465            }
1466
1467            tracing::debug!(
1468                "vote utime: {:?}, current_vote: {}, now-utime:{:?}, election_timeout: {:?}",
1469                utime,
1470                current_vote,
1471                utime.map(|x| now - x),
1472                election_timeout,
1473            );
1474
1475            // Follower/Candidate timer: next election
1476            if utime > Some(now - election_timeout) {
1477                tracing::debug!("election timeout has not yet passed",);
1478                return;
1479            }
1480
1481            tracing::info!("election timeout passed, check if it is a voter for election");
1482        }
1483
1484        // Every time elect, reset this flag.
1485        self.engine.reset_greater_log();
1486
1487        tracing::info!("do trigger election");
1488        self.engine.elect();
1489    }
1490
1491    #[tracing::instrument(level = "debug", skip_all)]
1492    fn handle_replication_progress(
1493        &mut self,
1494        target: C::NodeId,
1495        request_id: RequestId,
1496        result: Result<ReplicationResult<C>, String>,
1497    ) {
1498        tracing::debug!(
1499            target = display(&target),
1500            request_id = display(request_id),
1501            result = debug(&result),
1502            "handle_replication_progress"
1503        );
1504
1505        #[allow(clippy::collapsible_if)]
1506        if tracing::enabled!(Level::DEBUG) {
1507            if !self.replications.contains_key(&target) {
1508                tracing::warn!("leader has removed target: {}", target);
1509            };
1510        }
1511
1512        // A leader may have stepped down.
1513        if self.engine.leader.is_some() {
1514            self.engine.replication_handler().update_progress(target, request_id, result);
1515        }
1516    }
1517
1518    /// If a message is sent by a previous server state but is received by current server state,
1519    /// it is a stale message and should be just ignored.
1520    fn does_vote_match(&self, sender_vote: &Vote<C::NodeId>, msg: impl Display) -> bool {
1521        // Get the current leading vote:
1522        // - If input `sender_vote` is committed, it is sent by a Leader. Therefore we check against current
1523        //   Leader's vote.
1524        // - Otherwise, it is sent by a Candidate, we check against the current in progress voting state.
1525        let my_vote = if sender_vote.is_committed() {
1526            let l = self.engine.leader.as_ref();
1527            l.map(|x| x.vote.clone())
1528        } else {
1529            // If it finished voting, Candidate's vote is None.
1530            let candidate = self.engine.candidate_ref();
1531            candidate.map(|x| x.vote_ref().clone())
1532        };
1533
1534        if Some(sender_vote) != my_vote.as_ref() {
1535            tracing::warn!(
1536                "A message will be ignored because vote changed: msg sent by vote: {}; current my vote: {}; when ({})",
1537                sender_vote,
1538                my_vote.display(),
1539                msg
1540            );
1541            false
1542        } else {
1543            true
1544        }
1545    }
1546    /// If a message is sent by a previous replication session but is received by current server
1547    /// state, it is a stale message and should be just ignored.
1548    fn does_replication_session_match(
1549        &self,
1550        session_id: &ReplicationSessionId<C::NodeId>,
1551        msg: impl Display + Copy,
1552    ) -> bool {
1553        if !self.does_vote_match(session_id.vote_ref(), msg) {
1554            return false;
1555        }
1556
1557        if &session_id.membership_log_id != self.engine.state.membership_state.effective().log_id() {
1558            tracing::warn!(
1559                "membership_log_id changed: msg sent by: {}; curr: {}; ignore when ({})",
1560                session_id.membership_log_id.summary(),
1561                self.engine.state.membership_state.effective().log_id().summary(),
1562                msg
1563            );
1564            return false;
1565        }
1566        true
1567    }
1568}
1569
1570impl<C, N, LS, SM> RaftRuntime<C> for RaftCore<C, N, LS, SM>
1571where
1572    C: RaftTypeConfig,
1573    N: RaftNetworkFactory<C>,
1574    LS: RaftLogStorage<C>,
1575    SM: RaftStateMachine<C>,
1576{
1577    async fn run_command(&mut self, cmd: Command<C>) -> Result<Option<Command<C>>, StorageError<C::NodeId>> {
1578        let condition = cmd.condition();
1579        tracing::debug!("condition: {:?}", condition);
1580
1581        if let Some(condition) = condition {
1582            match condition {
1583                Condition::LogFlushed { .. } => {
1584                    todo!()
1585                }
1586                Condition::Applied { log_id } => {
1587                    if self.engine.state.io_applied() < log_id.as_ref() {
1588                        tracing::debug!(
1589                            "log_id: {} has not yet applied, postpone cmd: {:?}",
1590                            DisplayOption(log_id),
1591                            cmd
1592                        );
1593                        return Ok(Some(cmd));
1594                    }
1595                }
1596                Condition::StateMachineCommand { command_seq } => {
1597                    if self.command_state.finished_sm_seq < *command_seq {
1598                        tracing::debug!(
1599                            "sm::Command({}) has not yet finished({}), postpone cmd: {:?}",
1600                            command_seq,
1601                            self.command_state.finished_sm_seq,
1602                            cmd
1603                        );
1604                        return Ok(Some(cmd));
1605                    }
1606                }
1607            }
1608        }
1609
1610        match cmd {
1611            Command::BecomeLeader => {
1612                debug_assert!(self.leader_data.is_none(), "can not become leader twice");
1613                self.leader_data = Some(LeaderData::new());
1614            }
1615            Command::QuitLeader => {
1616                self.leader_data = None;
1617            }
1618            Command::AppendInputEntries { vote, entries } => {
1619                let last_log_id = entries.last().unwrap().get_log_id().clone();
1620                tracing::debug!("AppendInputEntries: {}", DisplaySlice::<_>(&entries),);
1621
1622                self.append_to_log(entries, vote, last_log_id.clone()).await?;
1623
1624                // The leader may have changed.
1625                // But reporting to a different leader is not a problem.
1626                if let Ok(mut lh) = self.engine.leader_handler() {
1627                    lh.replication_handler().update_local_progress(Some(last_log_id));
1628                }
1629            }
1630            Command::SaveVote { vote } => {
1631                self.log_store.save_vote(&vote).await?;
1632                self.engine.state.io_state_mut().update_vote(vote.clone());
1633
1634                let _ = self.tx_notify.send(Notify::VoteResponse {
1635                    target: self.id.clone(),
1636                    // last_log_id is not used when sending VoteRequest to local node
1637                    resp: VoteResponse::new(vote.clone(), None, true),
1638                    sender_vote: vote,
1639                });
1640            }
1641            Command::PurgeLog { upto } => {
1642                self.log_store.purge(upto.clone()).await?;
1643                self.engine.state.io_state_mut().update_purged(Some(upto));
1644            }
1645            Command::DeleteConflictLog { since } => {
1646                self.log_store.truncate(since.clone()).await?;
1647
1648                // Inform clients waiting for logs to be applied.
1649                let removed = self.client_resp_channels.split_off(&since.index);
1650                if !removed.is_empty() {
1651                    let leader_id = self.current_leader();
1652                    let leader_node = self.get_leader_node(leader_id.clone());
1653
1654                    // False positive lint warning(`non-binding `let` on a future`): https://github.com/rust-lang/rust-clippy/issues/9932
1655                    #[allow(clippy::let_underscore_future)]
1656                    let _ = C::spawn(async move {
1657                        for (log_index, tx) in removed.into_iter() {
1658                            tx.send(Err(ClientWriteError::ForwardToLeader(ForwardToLeader {
1659                                leader_id: leader_id.clone(),
1660                                leader_node: leader_node.clone(),
1661                            })));
1662
1663                            tracing::debug!("sent ForwardToLeader for log_index: {}", log_index,);
1664                        }
1665                    });
1666                }
1667            }
1668            Command::SendVote { vote_req } => {
1669                self.spawn_parallel_vote_requests(&vote_req).await;
1670            }
1671            Command::ReplicateCommitted { committed } => {
1672                for node in self.replications.values() {
1673                    let _ = node.tx_repl.send(Replicate::Committed(committed.clone()));
1674                }
1675            }
1676            Command::Commit {
1677                seq,
1678                ref already_committed,
1679                ref upto,
1680            } => {
1681                self.log_store.save_committed(Some(upto.clone())).await?;
1682                self.apply_to_state_machine(seq, already_committed.next_index(), upto.index).await?;
1683            }
1684            Command::Replicate { req, target } => {
1685                let node = self.replications.get(&target).expect("replication to target node exists");
1686
1687                match req {
1688                    Inflight::None => {
1689                        let _ = node.tx_repl.send(Replicate::Heartbeat);
1690                    }
1691                    Inflight::Logs { id, log_id_range } => {
1692                        let _ = node.tx_repl.send(Replicate::logs(RequestId::new_append_entries(id), log_id_range));
1693                    }
1694                    Inflight::Snapshot { id, last_log_id } => {
1695                        // unwrap: The replication channel must not be dropped or it is a bug.
1696                        node.tx_repl.send(Replicate::snapshot(RequestId::new_snapshot(id), last_log_id)).map_err(
1697                            |_e| StorageIOError::read_snapshot(None, AnyError::error("replication channel closed")),
1698                        )?;
1699                    }
1700                }
1701            }
1702            Command::RebuildReplicationStreams { targets } => {
1703                self.remove_all_replication().await;
1704
1705                for (target, matching) in targets.iter() {
1706                    let handle = self.spawn_replication_stream(target.clone(), matching.clone()).await;
1707                    self.replications.insert(target.clone(), handle);
1708                }
1709            }
1710            Command::StateMachine { command } => {
1711                // Just forward a state machine command to the worker.
1712                self.sm_handle.send(command).map_err(|_e| {
1713                    StorageIOError::write_state_machine(AnyError::error("can not send to sm::Worker".to_string()))
1714                })?;
1715            }
1716            Command::Respond { resp: send, .. } => {
1717                send.send();
1718            }
1719        }
1720
1721        Ok(None)
1722    }
1723}