1use std::borrow::Borrow;
2use std::collections::BTreeMap;
3use std::fmt::Debug;
4use std::fmt::Display;
5use std::fmt::Formatter;
6use std::marker::PhantomData;
7use std::sync::atomic::Ordering;
8use std::sync::Arc;
9use std::time::Duration;
10
11use anyerror::AnyError;
12use futures::stream::FuturesUnordered;
13use futures::StreamExt;
14use futures::TryFutureExt;
15use maplit::btreeset;
16use tokio::select;
17use tokio::sync::mpsc;
18use tokio::sync::watch;
19use tracing::Instrument;
20use tracing::Level;
21use tracing::Span;
22
23use crate::async_runtime::AsyncOneshotSendExt;
24use crate::config::Config;
25use crate::config::RuntimeConfig;
26use crate::core::balancer::Balancer;
27use crate::core::command_state::CommandState;
28use crate::core::notify::Notify;
29use crate::core::raft_msg::external_command::ExternalCommand;
30use crate::core::raft_msg::AppendEntriesTx;
31use crate::core::raft_msg::ClientReadTx;
32use crate::core::raft_msg::RaftMsg;
33use crate::core::raft_msg::ResultSender;
34use crate::core::raft_msg::VoteTx;
35use crate::core::sm;
36use crate::core::sm::handle;
37use crate::core::sm::CommandSeq;
38use crate::core::ServerState;
39use crate::display_ext::DisplayInstantExt;
40use crate::display_ext::DisplayOption;
41use crate::display_ext::DisplayOptionExt;
42use crate::display_ext::DisplaySlice;
43use crate::engine::Command;
44use crate::engine::Condition;
45use crate::engine::Engine;
46use crate::engine::Respond;
47use crate::entry::FromAppData;
48use crate::entry::RaftEntry;
49use crate::error::ClientWriteError;
50use crate::error::Fatal;
51use crate::error::ForwardToLeader;
52use crate::error::Infallible;
53use crate::error::InitializeError;
54use crate::error::QuorumNotEnough;
55use crate::error::RPCError;
56use crate::error::Timeout;
57use crate::log_id::LogIdOptionExt;
58use crate::log_id::RaftLogId;
59use crate::metrics::RaftDataMetrics;
60use crate::metrics::RaftMetrics;
61use crate::metrics::RaftServerMetrics;
62use crate::metrics::ReplicationMetrics;
63use crate::network::RPCOption;
64use crate::network::RPCTypes;
65use crate::network::RaftNetwork;
66use crate::network::RaftNetworkFactory;
67use crate::progress::entry::ProgressEntry;
68use crate::progress::Inflight;
69use crate::progress::Progress;
70use crate::quorum::QuorumSet;
71use crate::raft::responder::Responder;
72use crate::raft::AppendEntriesRequest;
73use crate::raft::AppendEntriesResponse;
74use crate::raft::ClientWriteResponse;
75use crate::raft::VoteRequest;
76use crate::raft::VoteResponse;
77use crate::raft_state::LogIOId;
78use crate::raft_state::LogStateReader;
79use crate::replication;
80use crate::replication::request::Replicate;
81use crate::replication::request_id::RequestId;
82use crate::replication::response::ReplicationResult;
83use crate::replication::ReplicationCore;
84use crate::replication::ReplicationHandle;
85use crate::replication::ReplicationSessionId;
86use crate::runtime::RaftRuntime;
87use crate::storage::LogFlushed;
88use crate::storage::RaftLogReaderExt;
89use crate::storage::RaftLogStorage;
90use crate::storage::RaftStateMachine;
91use crate::type_config::alias::InstantOf;
92use crate::type_config::alias::ResponderOf;
93use crate::type_config::TypeConfigExt;
94use crate::AsyncRuntime;
95use crate::ChangeMembers;
96use crate::Instant;
97use crate::LogId;
98use crate::Membership;
99use crate::MessageSummary;
100use crate::Node;
101use crate::NodeId;
102use crate::OptionalSend;
103use crate::RaftTypeConfig;
104use crate::StorageError;
105use crate::StorageIOError;
106use crate::Vote;
107
108#[derive(Debug)]
110pub(crate) struct ApplyingEntry<NID: NodeId, N: Node> {
111 log_id: LogId<NID>,
112 membership: Option<Membership<NID, N>>,
113}
114
115impl<NID: NodeId, N: Node> ApplyingEntry<NID, N> {
116 pub(crate) fn new(log_id: LogId<NID>, membership: Option<Membership<NID, N>>) -> Self {
117 Self { log_id, membership }
118 }
119}
120
121pub(crate) struct ApplyResult<C: RaftTypeConfig> {
123 pub(crate) since: u64,
124 pub(crate) end: u64,
125 pub(crate) last_applied: LogId<C::NodeId>,
126 pub(crate) applying_entries: Vec<ApplyingEntry<C::NodeId, C::Node>>,
127 pub(crate) apply_results: Vec<C::R>,
128}
129
130impl<C: RaftTypeConfig> Debug for ApplyResult<C> {
131 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
132 f.debug_struct("ApplyResult")
133 .field("since", &self.since)
134 .field("end", &self.end)
135 .field("last_applied", &self.last_applied)
136 .finish()
137 }
138}
139
140pub(crate) struct LeaderData<C: RaftTypeConfig> {
144 pub(crate) next_heartbeat: <C::AsyncRuntime as AsyncRuntime>::Instant,
146}
147
148impl<C: RaftTypeConfig> LeaderData<C> {
149 pub(crate) fn new() -> Self {
150 Self {
151 next_heartbeat: C::now(),
152 }
153 }
154}
155
156pub struct RaftCore<C, N, LS, SM>
159where
160 C: RaftTypeConfig,
161 N: RaftNetworkFactory<C>,
162 LS: RaftLogStorage<C>,
163 SM: RaftStateMachine<C>,
164{
165 pub(crate) id: C::NodeId,
167
168 pub(crate) config: Arc<Config>,
170
171 pub(crate) runtime_config: Arc<RuntimeConfig>,
172
173 pub(crate) network: N,
175
176 pub(crate) log_store: LS,
178
179 pub(crate) sm_handle: handle::Handle<C>,
181
182 pub(crate) engine: Engine<C>,
183
184 pub(crate) client_resp_channels: BTreeMap<u64, ResponderOf<C>>,
186
187 pub(crate) replications: BTreeMap<C::NodeId, ReplicationHandle<C>>,
189
190 pub(crate) leader_data: Option<LeaderData<C>>,
191
192 #[allow(dead_code)]
193 pub(crate) tx_api: mpsc::UnboundedSender<RaftMsg<C>>,
194 pub(crate) rx_api: mpsc::UnboundedReceiver<RaftMsg<C>>,
195
196 pub(crate) tx_notify: mpsc::UnboundedSender<Notify<C>>,
199
200 pub(crate) rx_notify: mpsc::UnboundedReceiver<Notify<C>>,
202
203 pub(crate) tx_metrics: watch::Sender<RaftMetrics<C::NodeId, C::Node>>,
204 pub(crate) tx_data_metrics: watch::Sender<RaftDataMetrics<C::NodeId>>,
205 pub(crate) tx_server_metrics: watch::Sender<RaftServerMetrics<C::NodeId, C::Node>>,
206
207 pub(crate) command_state: CommandState,
208
209 pub(crate) span: Span,
210
211 pub(crate) _p: PhantomData<SM>,
212}
213
214impl<C, N, LS, SM> RaftCore<C, N, LS, SM>
215where
216 C: RaftTypeConfig,
217 N: RaftNetworkFactory<C>,
218 LS: RaftLogStorage<C>,
219 SM: RaftStateMachine<C>,
220{
221 pub(crate) async fn main(
223 mut self,
224 rx_shutdown: <C::AsyncRuntime as AsyncRuntime>::OneshotReceiver<()>,
225 ) -> Result<Infallible, Fatal<C::NodeId>> {
226 let span = tracing::span!(parent: &self.span, Level::DEBUG, "main");
227 let res = self.do_main(rx_shutdown).instrument(span).await;
228
229 self.report_metrics(None);
231
232 let err = res.unwrap_err();
234 match err {
235 Fatal::Stopped => { }
236 _ => {
237 tracing::error!(error = display(&err), "quit RaftCore::main on error");
238 }
239 }
240
241 tracing::debug!("update the metrics for shutdown");
242 {
243 let mut curr = self.tx_metrics.borrow().clone();
244 curr.state = ServerState::Shutdown;
245 curr.running_state = Err(err.clone());
246
247 let _ = self.tx_metrics.send(curr);
248 }
249
250 tracing::info!("RaftCore shutdown complete");
251
252 Err(err)
253 }
254
255 #[tracing::instrument(level="trace", skip_all, fields(id=display(&self.id), cluster=%self.config.cluster_name))]
256 async fn do_main(
257 &mut self,
258 rx_shutdown: <C::AsyncRuntime as AsyncRuntime>::OneshotReceiver<()>,
259 ) -> Result<Infallible, Fatal<C::NodeId>> {
260 tracing::debug!("raft node is initializing");
261
262 self.engine.startup();
263 self.run_engine_commands().await?;
265
266 self.report_metrics(None);
268
269 self.runtime_loop(rx_shutdown).await
270 }
271
272 #[tracing::instrument(level = "trace", skip(self, tx))]
284 pub(super) async fn handle_check_is_leader_request(&mut self, tx: ClientReadTx<C>) {
285 let resp = {
288 let l = self.engine.leader_handler();
289 let lh = match l {
290 Ok(leading_handler) => leading_handler,
291 Err(forward) => {
292 let _ = tx.send(Err(forward.into()));
293 return;
294 }
295 };
296
297 let read_log_id = lh.get_read_log_id();
298
299 let applied = self.engine.state.io_applied().cloned();
302
303 (read_log_id, applied)
304 };
305
306 let my_id = self.id.clone();
307 let my_vote = self.engine.state.vote_ref().clone();
308 let ttl = Duration::from_millis(self.config.heartbeat_interval);
309 let eff_mem = self.engine.state.membership_state.effective().clone();
310 let core_tx = self.tx_notify.clone();
311
312 let mut granted = btreeset! {my_id.clone()};
313
314 if eff_mem.is_quorum(granted.iter()) {
315 let _ = tx.send(Ok(resp));
316 return;
317 }
318
319 let mut pending = FuturesUnordered::new();
321
322 let voter_progresses = {
323 let l = &self.engine.leader.as_ref().unwrap();
324 l.progress.iter().filter(|(id, _v)| l.progress.is_voter(id) == Some(true))
325 };
326
327 for (target, progress) in voter_progresses {
328 let target = target.clone();
329
330 if target == my_id {
331 continue;
332 }
333
334 let rpc = AppendEntriesRequest {
335 vote: my_vote.clone(),
336 prev_log_id: progress.matching.clone(),
337 entries: vec![],
338 leader_commit: self.engine.state.committed().cloned(),
339 };
340
341 let target_node = eff_mem.get_node(&target).unwrap().clone();
343 let mut client = self.network.new_client(target.clone(), &target_node).await;
344
345 let option = RPCOption::new(ttl);
346
347 let fu = {
348 let my_id = my_id.clone();
349 let target = target.clone();
350 async move {
351 let outer_res = C::AsyncRuntime::timeout(ttl, client.append_entries(rpc, option)).await;
352 match outer_res {
353 Ok(append_res) => match append_res {
354 Ok(x) => Ok((target.clone(), x)),
355 Err(err) => Err((target.clone(), err)),
356 },
357 Err(_timeout) => {
358 let timeout_err = Timeout {
359 action: RPCTypes::AppendEntries,
360 id: my_id,
361 target: target.clone(),
362 timeout: ttl,
363 };
364
365 Err((target, RPCError::Timeout(timeout_err)))
366 }
367 }
368 }
369 };
370
371 let fu = fu.instrument(tracing::debug_span!("spawn_is_leader", target = target.to_string()));
372 let task = C::AsyncRuntime::spawn(fu).map_err(move |err| (target, err));
373
374 pending.push(task);
375 }
376
377 let waiting_fu = async move {
378 while let Some(res) = pending.next().await {
380 let (target, append_res) = match res {
381 Ok(Ok(res)) => res,
382 Ok(Err((target, err))) => {
383 tracing::error!(target=display(&target), error=%err, "timeout while confirming leadership for read request");
384 continue;
385 }
386 Err((target, err)) => {
387 tracing::error!(target = display(&target), "fail to join task: {}", err);
388 continue;
389 }
390 };
391
392 if let AppendEntriesResponse::HigherVote(vote) = append_res {
395 debug_assert!(
396 vote > my_vote,
397 "committed vote({}) has total order relation with other votes({})",
398 my_vote,
399 vote
400 );
401
402 let send_res = core_tx.send(Notify::HigherVote {
403 target,
404 higher: vote,
405 sender_vote: my_vote,
406 });
407
408 if let Err(_e) = send_res {
409 tracing::error!("fail to send HigherVote to RaftCore");
410 }
411
412 let err = ForwardToLeader::empty();
414 let _ = tx.send(Err(err.into()));
415 return;
416 }
417
418 granted.insert(target);
419
420 if eff_mem.is_quorum(granted.iter()) {
421 let _ = tx.send(Ok(resp));
422 return;
423 }
424 }
425
426 let _ = tx.send(Err(QuorumNotEnough {
430 cluster: eff_mem.membership().summary(),
431 got: granted,
432 }
433 .into()));
434 };
435
436 #[allow(clippy::let_underscore_future)]
440 let _ = C::AsyncRuntime::spawn(waiting_fu.instrument(tracing::debug_span!("spawn_is_leader_waiting")));
441 }
442
443 #[tracing::instrument(level = "debug", skip(self, tx))]
461 pub(super) fn change_membership(
462 &mut self,
463 changes: ChangeMembers<C::NodeId, C::Node>,
464 retain: bool,
465 tx: ResponderOf<C>,
466 ) {
467 let res = self.engine.state.membership_state.change_handler().apply(changes, retain);
468 let new_membership = match res {
469 Ok(x) => x,
470 Err(e) => {
471 tx.send(Err(ClientWriteError::ChangeMembershipError(e)));
472 return;
473 }
474 };
475
476 let ent = C::Entry::new_membership(LogId::default(), new_membership);
477 self.write_entry(ent, Some(tx));
478 }
479
480 #[tracing::instrument(level = "debug", skip_all, fields(id = display(&self.id)))]
488 pub fn write_entry(&mut self, entry: C::Entry, resp_tx: Option<ResponderOf<C>>) -> bool {
489 tracing::debug!(payload = display(&entry), "write_entry");
490
491 let (mut lh, tx) = if let Some((lh, tx)) = self.engine.get_leader_handler_or_reject(resp_tx) {
492 (lh, tx)
493 } else {
494 return false;
495 };
496
497 let entries = vec![entry];
498 lh.leader_append_entries(entries);
501 let index = lh.state.last_log_id().unwrap().index;
502
503 if let Some(tx) = tx {
505 self.client_resp_channels.insert(index, tx);
506 }
507
508 true
509 }
510
511 #[tracing::instrument(level = "debug", skip_all, fields(id = display(&self.id)))]
515 pub fn send_heartbeat(&mut self, emitter: impl Display) -> bool {
516 tracing::debug!(now = debug(C::now()), "send_heartbeat");
517
518 let mut lh = if let Some((lh, _)) = self.engine.get_leader_handler_or_reject(None) {
519 lh
520 } else {
521 tracing::debug!(now = debug(C::now()), "{} failed to send heartbeat", emitter);
522 return false;
523 };
524
525 lh.send_heartbeat();
526
527 tracing::debug!("{} triggered sending heartbeat", emitter);
528 true
529 }
530
531 #[tracing::instrument(level = "debug", skip_all)]
532 pub fn flush_metrics(&mut self) {
533 let leader_metrics = if let Some(leader) = self.engine.leader.as_ref() {
534 let prog = &leader.progress;
535 Some(
536 prog.iter()
537 .map(|(id, p)| {
538 (
539 id.clone(),
540 <ProgressEntry<<C as RaftTypeConfig>::NodeId> as Borrow<Option<LogId<C::NodeId>>>>::borrow(
541 p,
542 )
543 .clone(),
544 )
545 })
546 .collect(),
547 )
548 } else {
549 None
550 };
551 self.report_metrics(leader_metrics);
552 }
553
554 #[tracing::instrument(level = "debug", skip_all)]
556 pub(crate) fn report_metrics(&mut self, replication: Option<ReplicationMetrics<C::NodeId>>) {
557 let last_quorum_acked = self.last_quorum_acked_time();
558 let millis_since_quorum_ack = last_quorum_acked.map(|t| t.elapsed().as_millis() as u64);
559
560 let st = &self.engine.state;
561
562 let membership_config = st.membership_state.effective().stored_membership().clone();
563 let current_leader = self.current_leader();
564
565 let m = RaftMetrics {
566 running_state: Ok(()),
567 id: self.id.clone(),
568
569 current_term: st.vote_ref().leader_id().get_term(),
571 vote: st.io_state().vote().clone(),
572 last_log_index: st.last_log_id().index(),
573 last_applied: st.io_applied().cloned(),
574 snapshot: st.io_snapshot_last_log_id().cloned(),
575 purged: st.io_purged().cloned(),
576
577 state: st.server_state,
579 current_leader: current_leader.clone(),
580 millis_since_quorum_ack,
581 membership_config: membership_config.clone(),
582
583 replication: replication.clone(),
585 };
586
587 let data_metrics = RaftDataMetrics {
588 last_log: st.last_log_id().cloned(),
589 last_applied: st.io_applied().cloned(),
590 snapshot: st.io_snapshot_last_log_id().cloned(),
591 purged: st.io_purged().cloned(),
592 millis_since_quorum_ack,
593 replication,
594 };
595
596 let server_metrics = RaftServerMetrics {
597 id: self.id.clone(),
598 vote: st.io_state().vote().clone(),
599 state: st.server_state,
600 current_leader,
601 membership_config,
602 };
603
604 self.tx_data_metrics.send_if_modified(|metrix| {
610 if data_metrics.ne(metrix) {
611 *metrix = data_metrics.clone();
612 return true;
613 }
614 false
615 });
616
617 self.tx_server_metrics.send_if_modified(|metrix| {
618 if server_metrics.ne(metrix) {
619 *metrix = server_metrics.clone();
620 return true;
621 }
622 false
623 });
624
625 tracing::debug!("report_metrics: {}", m.summary());
626 let res = self.tx_metrics.send(m);
627
628 if let Err(err) = res {
629 tracing::error!(error=%err, id=display(&self.id), "error reporting metrics");
630 }
631 }
632
633 #[tracing::instrument(level = "debug", skip(self, tx))]
640 pub(crate) fn handle_initialize(
641 &mut self,
642 member_nodes: BTreeMap<C::NodeId, C::Node>,
643 tx: ResultSender<C, (), InitializeError<C::NodeId, C::Node>>,
644 ) {
645 tracing::debug!(member_nodes = debug(&member_nodes), "{}", func_name!());
646
647 let membership = Membership::from(member_nodes);
648
649 let entry = C::Entry::new_membership(LogId::default(), membership);
650 let res = self.engine.initialize(entry);
651 self.engine.output.push_command(Command::Respond {
652 when: None,
653 resp: Respond::new(res, tx),
654 });
655 }
656
657 #[tracing::instrument(level = "debug", skip(self))]
659 pub(crate) fn trigger_snapshot(&mut self) {
660 tracing::debug!("{}", func_name!());
661 self.engine.snapshot_handler().trigger_snapshot();
662 }
663
664 #[tracing::instrument(level = "debug", skip(self))]
665 pub(crate) fn current_leader(&self) -> Option<C::NodeId> {
666 tracing::debug!(
667 self_id = display(&self.id),
668 vote = display(self.engine.state.vote_ref().summary()),
669 "get current_leader"
670 );
671
672 let vote = self.engine.state.vote_ref();
673
674 if !vote.is_committed() {
675 return None;
676 }
677
678 let id = vote.leader_id().voted_for().unwrap();
680
681 if self.engine.state.membership_state.effective().is_voter(&id) {
684 Some(id)
685 } else {
686 tracing::debug!("id={} is not a voter", id);
687 None
688 }
689 }
690
691 fn last_quorum_acked_time(&mut self) -> Option<InstantOf<C>> {
697 let leading = self.engine.leader.as_mut();
698 leading.and_then(|l| l.last_quorum_acked_time())
699 }
700
701 pub(crate) fn get_leader_node(&self, leader_id: Option<C::NodeId>) -> Option<C::Node> {
702 let leader_id = leader_id?;
703
704 self.engine.state.membership_state.effective().get_node(&leader_id).cloned()
705 }
706
707 #[tracing::instrument(level = "debug", skip_all)]
709 pub(crate) async fn append_to_log<I>(
710 &mut self,
711 entries: I,
712 vote: Vote<C::NodeId>,
713 last_log_id: LogId<C::NodeId>,
714 ) -> Result<(), StorageError<C::NodeId>>
715 where
716 I: IntoIterator<Item = C::Entry> + OptionalSend,
717 I::IntoIter: OptionalSend,
718 {
719 tracing::debug!("append_to_log");
720
721 let (tx, rx) = C::AsyncRuntime::oneshot();
722 let log_io_id = LogIOId::new(vote, Some(last_log_id));
723
724 let callback = LogFlushed::new(log_io_id, tx);
725
726 self.log_store.append(entries, callback).await?;
727 rx.await
728 .map_err(|e| StorageIOError::write_logs(AnyError::error(e)))?
729 .map_err(|e| StorageIOError::write_logs(AnyError::error(e)))?;
730 Ok(())
731 }
732
733 #[tracing::instrument(level = "debug", skip_all)]
734 pub(crate) async fn apply_to_state_machine(
735 &mut self,
736 seq: CommandSeq,
737 since: u64,
738 upto_index: u64,
739 ) -> Result<(), StorageError<C::NodeId>> {
740 tracing::debug!(upto_index = display(upto_index), "{}", func_name!());
741
742 let end = upto_index + 1;
743
744 debug_assert!(
745 since <= end,
746 "last_applied index {} should <= committed index {}",
747 since,
748 end
749 );
750
751 if since == end {
752 return Ok(());
753 }
754
755 let entries = self.log_store.get_log_entries(since..end).await?;
756 tracing::debug!(
757 entries = display(DisplaySlice::<_>(entries.as_slice())),
758 "about to apply"
759 );
760
761 let last_applied = entries[entries.len() - 1].get_log_id().clone();
762
763 let cmd = sm::Command::apply(entries).with_seq(seq);
764 self.sm_handle.send(cmd).map_err(|e| StorageIOError::apply(last_applied, AnyError::error(e)))?;
765
766 Ok(())
767 }
768
769 #[tracing::instrument(level = "debug", skip_all)]
772 pub(crate) fn handle_apply_result(&mut self, res: ApplyResult<C>) {
773 tracing::debug!(last_applied = display(res.last_applied), "{}", func_name!());
774
775 let mut results = res.apply_results.into_iter();
776 let mut applying_entries = res.applying_entries.into_iter();
777
778 for log_index in res.since..res.end {
779 let ent = applying_entries.next().unwrap();
780 let apply_res = results.next().unwrap();
781 let tx = self.client_resp_channels.remove(&log_index);
782
783 Self::send_response(ent, apply_res, tx);
784 }
785 }
786
787 #[tracing::instrument(level = "debug", skip_all)]
789 pub(super) fn send_response(entry: ApplyingEntry<C::NodeId, C::Node>, resp: C::R, tx: Option<ResponderOf<C>>) {
790 tracing::debug!(entry = debug(&entry), "send_response");
791
792 let tx = match tx {
793 None => return,
794 Some(x) => x,
795 };
796
797 let membership = entry.membership;
798
799 let res = Ok(ClientWriteResponse {
800 log_id: entry.log_id,
801 data: resp,
802 membership,
803 });
804
805 tx.send(res);
806 }
807
808 #[tracing::instrument(level = "debug", skip(self))]
810 #[allow(clippy::type_complexity)]
811 pub(crate) async fn spawn_replication_stream(
812 &mut self,
813 target: C::NodeId,
814 progress_entry: ProgressEntry<C::NodeId>,
815 ) -> ReplicationHandle<C> {
816 let target_node = self.engine.state.membership_state.effective().get_node(&target).unwrap();
818
819 let membership_log_id = self.engine.state.membership_state.effective().log_id();
820 let network = self.network.new_client(target.clone(), target_node).await;
821 let snapshot_network = self.network.new_client(target.clone(), target_node).await;
822
823 let leader = self.engine.leader.as_ref().unwrap();
824
825 let session_id = ReplicationSessionId::new(leader.vote.clone(), membership_log_id.clone());
826
827 ReplicationCore::<C, N, LS>::spawn(
828 target.clone(),
829 session_id,
830 self.config.clone(),
831 self.engine.state.committed().cloned(),
832 progress_entry.matching,
833 network,
834 snapshot_network,
835 self.log_store.get_log_reader().await,
836 self.sm_handle.new_snapshot_reader(),
837 self.tx_notify.clone(),
838 tracing::span!(parent: &self.span, Level::DEBUG, "replication", id=display(&self.id), target=display(&target)),
839 )
840 }
841
842 #[tracing::instrument(level = "debug", skip_all)]
844 pub async fn remove_all_replication(&mut self) {
845 tracing::info!("remove all replication");
846
847 let nodes = std::mem::take(&mut self.replications);
848
849 tracing::debug!(
850 targets = debug(nodes.iter().map(|x| x.0.clone()).collect::<Vec<_>>()),
851 "remove all targets from replication_metrics"
852 );
853
854 for (target, s) in nodes {
855 let handle = s.join_handle;
856
857 drop(s.tx_repl);
859
860 tracing::debug!("joining removed replication: {}", target);
861 let _x = handle.await;
862 tracing::info!("Done joining removed replication : {}", target);
863 }
864 }
865
866 #[tracing::instrument(level = "debug", skip_all)]
871 pub(crate) async fn run_engine_commands(&mut self) -> Result<(), StorageError<C::NodeId>> {
872 if tracing::enabled!(Level::DEBUG) {
873 tracing::debug!("queued commands: start...");
874 for c in self.engine.output.iter_commands() {
875 tracing::debug!("queued commands: {:?}", c);
876 }
877 tracing::debug!("queued commands: end...");
878 }
879
880 while let Some(cmd) = self.engine.output.pop_command() {
881 tracing::debug!("run command: {:?}", cmd);
882
883 let res = self.run_command(cmd).await?;
884
885 if let Some(cmd) = res {
886 tracing::debug!("early return: postpone command: {:?}", cmd);
887 self.engine.output.postpone_command(cmd);
888
889 if tracing::enabled!(Level::DEBUG) {
890 for c in self.engine.output.iter_commands().take(8) {
891 tracing::debug!("postponed, first 8 queued commands: {:?}", c);
892 }
893 }
894
895 return Ok(());
896 }
897 }
898
899 Ok(())
900 }
901
902 #[tracing::instrument(level="debug", skip_all, fields(id=display(&self.id)))]
906 async fn runtime_loop(
907 &mut self,
908 mut rx_shutdown: <C::AsyncRuntime as AsyncRuntime>::OneshotReceiver<()>,
909 ) -> Result<Infallible, Fatal<C::NodeId>> {
910 let mut balancer = Balancer::new(10_000);
912
913 loop {
914 self.flush_metrics();
915
916 select! {
922 biased;
927
928 _ = &mut rx_shutdown => {
929 tracing::info!("recv from rx_shutdown");
930 return Err(Fatal::Stopped);
931 }
932
933 notify_res = self.rx_notify.recv() => {
934 match notify_res {
935 Some(notify) => self.handle_notify(notify)?,
936 None => {
937 tracing::error!("all rx_notify senders are dropped");
938 return Err(Fatal::Stopped);
939 }
940 };
941 }
942
943 msg_res = self.rx_api.recv() => {
944 match msg_res {
945 Some(msg) => self.handle_api_msg(msg).await,
946 None => {
947 tracing::info!("all rx_api senders are dropped");
948 return Err(Fatal::Stopped);
949 }
950 };
951 }
952 }
953
954 self.run_engine_commands().await?;
955
956 let raft_msg_processed = self.process_raft_msg(balancer.raft_msg()).await?;
959 let notify_processed = self.process_notify(balancer.notify()).await?;
960
961 #[allow(clippy::collapsible_else_if)]
964 if notify_processed == balancer.notify() {
965 tracing::info!("there may be more Notify to process, increase Notify ratio");
966 balancer.increase_notify();
967 } else {
968 if raft_msg_processed == balancer.raft_msg() {
969 tracing::info!("there may be more RaftMsg to process, increase RaftMsg ratio");
970 balancer.increase_raft_msg();
971 }
972 }
973 }
974 }
975
976 async fn process_raft_msg(&mut self, at_most: u64) -> Result<u64, Fatal<C::NodeId>> {
981 for i in 0..at_most {
982 let res = self.rx_api.try_recv();
983 let msg = match res {
984 Ok(msg) => msg,
985 Err(e) => match e {
986 mpsc::error::TryRecvError::Empty => {
987 tracing::debug!("all RaftMsg are processed, wait for more");
988 return Ok(i + 1);
989 }
990 mpsc::error::TryRecvError::Disconnected => {
991 tracing::debug!("rx_api is disconnected, quit");
992 return Err(Fatal::Stopped);
993 }
994 },
995 };
996
997 self.handle_api_msg(msg).await;
998
999 self.run_engine_commands().await?;
1004 }
1005
1006 tracing::debug!("at_most({}) reached, there are more queued RaftMsg to process", at_most);
1007
1008 Ok(at_most)
1009 }
1010
1011 async fn process_notify(&mut self, at_most: u64) -> Result<u64, Fatal<C::NodeId>> {
1016 for i in 0..at_most {
1017 let res = self.rx_notify.try_recv();
1018 let notify = match res {
1019 Ok(msg) => msg,
1020 Err(e) => match e {
1021 mpsc::error::TryRecvError::Empty => {
1022 tracing::debug!("all Notify are processed, wait for more");
1023 return Ok(i + 1);
1024 }
1025 mpsc::error::TryRecvError::Disconnected => {
1026 tracing::error!("rx_notify is disconnected, quit");
1027 return Err(Fatal::Stopped);
1028 }
1029 },
1030 };
1031
1032 self.handle_notify(notify)?;
1033
1034 self.run_engine_commands().await?;
1039 }
1040
1041 tracing::debug!("at_most({}) reached, there are more queued Notify to process", at_most);
1042
1043 Ok(at_most)
1044 }
1045
1046 #[tracing::instrument(level = "trace", skip_all, fields(vote=vote_req.summary()))]
1048 async fn spawn_parallel_vote_requests(&mut self, vote_req: &VoteRequest<C::NodeId>) {
1049 let members = self.engine.state.membership_state.effective().voter_ids();
1050
1051 let vote = vote_req.vote.clone();
1052
1053 for target in members {
1054 if target == self.id {
1055 continue;
1056 }
1057
1058 let req = vote_req.clone();
1059
1060 let target_node = self.engine.state.membership_state.effective().get_node(&target).unwrap().clone();
1062 let mut client = self.network.new_client(target.clone(), &target_node).await;
1063
1064 let tx = self.tx_notify.clone();
1065
1066 let ttl = Duration::from_millis(self.config.election_timeout_min);
1067 let id = self.id.clone();
1068 let option = RPCOption::new(ttl);
1069
1070 #[allow(clippy::let_underscore_future)]
1072 let _ = C::AsyncRuntime::spawn(
1073 {
1074 let target = target.clone();
1075 let vote = vote.clone();
1076
1077 async move {
1078 let tm_res = C::AsyncRuntime::timeout(ttl, client.vote(req, option)).await;
1079 let res = match tm_res {
1080 Ok(res) => res,
1081
1082 Err(_timeout) => {
1083 let timeout_err = Timeout {
1084 action: RPCTypes::Vote,
1085 id,
1086 target: target.clone(),
1087 timeout: ttl,
1088 };
1089 tracing::error!({error = %timeout_err, target = display(&target)}, "timeout");
1090 return;
1091 }
1092 };
1093
1094 match res {
1095 Ok(resp) => {
1096 let _ = tx.send(Notify::VoteResponse {
1097 target,
1098 resp,
1099 sender_vote: vote,
1100 });
1101 }
1102 Err(err) => tracing::error!({error=%err, target=display(&target)}, "while requesting vote"),
1103 }
1104 }
1105 }
1106 .instrument(tracing::debug_span!(
1107 parent: &Span::current(),
1108 "send_vote_req",
1109 target = display(&target)
1110 )),
1111 );
1112 }
1113 }
1114
1115 #[tracing::instrument(level = "debug", skip_all)]
1116 pub(super) fn handle_vote_request(&mut self, req: VoteRequest<C::NodeId>, tx: VoteTx<C>) {
1117 tracing::info!(req = display(req.summary()), func = func_name!());
1118
1119 let resp = self.engine.handle_vote_req(req);
1120 self.engine.output.push_command(Command::Respond {
1121 when: None,
1122 resp: Respond::new(Ok(resp), tx),
1123 });
1124 }
1125
1126 #[tracing::instrument(level = "debug", skip_all)]
1127 pub(super) fn handle_append_entries_request(&mut self, req: AppendEntriesRequest<C>, tx: AppendEntriesTx<C>) {
1128 tracing::debug!(req = display(req.summary()), func = func_name!());
1129
1130 let is_ok = self.engine.handle_append_entries(&req.vote, req.prev_log_id, req.entries, Some(tx));
1131
1132 if is_ok {
1133 self.engine.handle_commit_entries(req.leader_commit);
1134 }
1135 }
1136
1137 #[tracing::instrument(level = "debug", skip(self, msg), fields(state = debug(self.engine.state.server_state), id=display(&self.id)))]
1139 pub(crate) async fn handle_api_msg(&mut self, msg: RaftMsg<C>) {
1140 tracing::debug!("recv from rx_api: {}", msg.summary());
1141
1142 match msg {
1143 RaftMsg::AppendEntries { rpc, tx } => {
1144 self.handle_append_entries_request(rpc, tx);
1145 }
1146 RaftMsg::RequestVote { rpc, tx } => {
1147 let now = C::now();
1148 tracing::info!(
1149 now = display(now.display()),
1150 vote_request = display(&rpc),
1151 "received RaftMsg::RequestVote: {}",
1152 func_name!()
1153 );
1154
1155 self.handle_vote_request(rpc, tx);
1156 }
1157 RaftMsg::BeginReceivingSnapshot { tx } => {
1158 self.engine.handle_begin_receiving_snapshot(tx);
1159 }
1160 RaftMsg::InstallFullSnapshot { vote, snapshot, tx } => {
1161 self.engine.handle_install_full_snapshot(vote, snapshot, tx);
1162 }
1163 RaftMsg::CheckIsLeaderRequest { tx } => {
1164 self.handle_check_is_leader_request(tx).await;
1165 }
1166 RaftMsg::ClientWriteRequest { app_data, tx } => {
1167 self.write_entry(C::Entry::from_app_data(app_data), Some(tx));
1168 }
1169 RaftMsg::Initialize { members, tx } => {
1170 tracing::info!(
1171 members = debug(&members),
1172 "received RaftMsg::Initialize: {}",
1173 func_name!()
1174 );
1175
1176 self.handle_initialize(members, tx);
1177 }
1178 RaftMsg::ChangeMembership { changes, retain, tx } => {
1179 tracing::info!(
1180 members = debug(&changes),
1181 retain = debug(&retain),
1182 "received RaftMsg::ChangeMembership: {}",
1183 func_name!()
1184 );
1185
1186 self.change_membership(changes, retain, tx);
1187 }
1188 RaftMsg::ExternalCoreRequest { req } => {
1189 req(&self.engine.state);
1190 }
1191 RaftMsg::ExternalCommand { cmd } => {
1192 tracing::info!(cmd = debug(&cmd), "received RaftMsg::ExternalCommand: {}", func_name!());
1193
1194 match cmd {
1195 ExternalCommand::Elect => {
1196 if self.engine.state.membership_state.effective().is_voter(&self.id) {
1197 self.engine.elect();
1199 tracing::debug!("ExternalCommand: triggered election");
1200 } else {
1201 }
1203 }
1204 ExternalCommand::Heartbeat => {
1205 self.send_heartbeat("ExternalCommand");
1206 }
1207 ExternalCommand::Snapshot => self.trigger_snapshot(),
1208 ExternalCommand::GetSnapshot { tx } => {
1209 let cmd = sm::Command::get_snapshot(tx);
1210 let res = self.sm_handle.send(cmd);
1211 if let Err(e) = res {
1212 tracing::error!(error = display(e), "error sending GetSnapshot to sm worker");
1213 }
1214 }
1215 ExternalCommand::PurgeLog { upto } => {
1216 self.engine.trigger_purge_log(upto);
1217 }
1218 }
1219 }
1220 };
1221 }
1222
1223 #[tracing::instrument(level = "debug", skip_all, fields(state = debug(self.engine.state.server_state), id=display(&self.id)))]
1225 pub(crate) fn handle_notify(&mut self, notify: Notify<C>) -> Result<(), Fatal<C::NodeId>> {
1226 tracing::debug!("recv from rx_notify: {}", notify.summary());
1227
1228 match notify {
1229 Notify::VoteResponse {
1230 target,
1231 resp,
1232 sender_vote,
1233 } => {
1234 let now = C::now();
1235
1236 tracing::info!(
1237 now = display(now.display()),
1238 resp = display(&resp),
1239 "received Notify::VoteResponse: {}",
1240 func_name!()
1241 );
1242
1243 if self.does_vote_match(&sender_vote, "VoteResponse") {
1244 self.engine.handle_vote_resp(target, resp);
1245 }
1246 }
1247
1248 Notify::HigherVote {
1249 target,
1250 higher,
1251 sender_vote,
1252 } => {
1253 tracing::info!(
1254 target = display(&target),
1255 higher_vote = display(&higher),
1256 sending_vote = display(&sender_vote),
1257 "received Notify::HigherVote: {}",
1258 func_name!()
1259 );
1260
1261 if self.does_vote_match(&sender_vote, "HigherVote") {
1262 let _ = self.engine.vote_handler().update_vote(&higher);
1264 }
1265 }
1266
1267 Notify::Tick { i } => {
1268 let now = C::now();
1271 tracing::debug!("received tick: {}, now: {:?}", i, now);
1272
1273 self.handle_tick_election();
1274
1275 let heartbeat_at = self.leader_data.as_ref().map(|x| x.next_heartbeat);
1280 if let Some(t) = heartbeat_at {
1281 if now >= t {
1282 if self.runtime_config.enable_heartbeat.load(Ordering::Relaxed) {
1283 self.send_heartbeat("tick");
1284 }
1285
1286 if let Some(l) = &mut self.leader_data {
1288 l.next_heartbeat = C::now() + Duration::from_millis(self.config.heartbeat_interval);
1289 }
1290 }
1291 }
1292
1293 if self.engine.state.io_applied() >= self.engine.state.membership_state.effective().log_id().as_ref() {
1312 self.engine.leader_step_down();
1313 }
1314 }
1315
1316 Notify::Network { response } => {
1317 match response {
1319 replication::Response::Progress {
1320 target,
1321 request_id: id,
1322 result,
1323 session_id,
1324 } => {
1325 if self.does_replication_session_match(&session_id, "UpdateReplicationMatched") {
1328 self.handle_replication_progress(target, id, result);
1329 }
1330 }
1331
1332 replication::Response::StorageError { error } => {
1333 tracing::error!(
1334 error = display(&error),
1335 "received Notify::ReplicationStorageError: {}",
1336 func_name!()
1337 );
1338
1339 return Err(Fatal::from(error));
1340 }
1341
1342 replication::Response::HigherVote {
1343 target,
1344 higher,
1345 sender_vote,
1346 } => {
1347 tracing::info!(
1348 target = display(&target),
1349 higher_vote = display(&higher),
1350 sender_vote = display(&sender_vote),
1351 "received Notify::HigherVote: {}",
1352 func_name!()
1353 );
1354
1355 if self.does_vote_match(&sender_vote, "HigherVote") {
1356 let _ = self.engine.vote_handler().update_vote(&higher);
1358 }
1359 }
1360 }
1361 }
1362
1363 Notify::StateMachine { command_result } => {
1364 tracing::debug!("sm::StateMachine command result: {:?}", command_result);
1365
1366 let seq = command_result.command_seq;
1367 let res = command_result.result?;
1368
1369 match res {
1370 sm::Response::BuildSnapshot(_) => {}
1373 _ => {
1374 debug_assert!(
1375 self.command_state.finished_sm_seq < seq,
1376 "sm::StateMachine command result is out of order: expect {} < {}",
1377 self.command_state.finished_sm_seq,
1378 seq
1379 );
1380 }
1381 }
1382 self.command_state.finished_sm_seq = seq;
1383
1384 match res {
1385 sm::Response::BuildSnapshot(meta) => {
1386 tracing::info!(
1387 "sm::StateMachine command done: BuildSnapshot: {}: {}",
1388 meta.summary(),
1389 func_name!()
1390 );
1391
1392 let last_log_id = meta.last_log_id.clone();
1396 self.engine.finish_building_snapshot(meta);
1397
1398 let st = self.engine.state.io_state_mut();
1399 st.update_snapshot(last_log_id);
1400 }
1401 sm::Response::InstallSnapshot(meta) => {
1402 tracing::info!(
1403 "sm::StateMachine command done: InstallSnapshot: {}: {}",
1404 meta.summary(),
1405 func_name!()
1406 );
1407
1408 if let Some(meta) = meta {
1409 let st = self.engine.state.io_state_mut();
1410 st.update_applied(meta.last_log_id.clone());
1411 st.update_snapshot(meta.last_log_id);
1412 }
1413 }
1414 sm::Response::Apply(res) => {
1415 self.engine.state.io_state_mut().update_applied(Some(res.last_applied.clone()));
1416
1417 self.handle_apply_result(res);
1418 }
1419 }
1420 }
1421 };
1422 Ok(())
1423 }
1424
1425 #[tracing::instrument(level = "debug", skip_all)]
1426 fn handle_tick_election(&mut self) {
1427 let now = C::now();
1428
1429 tracing::debug!("try to trigger election by tick, now: {:?}", now);
1430
1431 if self.engine.state.server_state == ServerState::Leader {
1434 tracing::debug!("already a leader, do not elect again");
1435 return;
1436 }
1437
1438 if !self.engine.state.membership_state.effective().is_voter(&self.id) {
1439 tracing::debug!("this node is not a voter");
1440 return;
1441 }
1442
1443 if !self.runtime_config.enable_elect.load(Ordering::Relaxed) {
1444 tracing::debug!("election is disabled");
1445 return;
1446 }
1447
1448 if self.engine.state.membership_state.effective().voter_ids().count() == 1 {
1449 tracing::debug!("this is the only voter, do election at once");
1450 } else {
1451 tracing::debug!("there are multiple voter, check election timeout");
1452
1453 let current_vote = self.engine.state.vote_ref();
1454 let utime = self.engine.state.vote_last_modified();
1455 let timer_config = &self.engine.config.timer_config;
1456
1457 let mut election_timeout = if current_vote.is_committed() {
1458 timer_config.leader_lease + timer_config.election_timeout
1459 } else {
1460 timer_config.election_timeout
1461 };
1462
1463 if self.engine.is_there_greater_log() {
1464 election_timeout += timer_config.smaller_log_timeout;
1465 }
1466
1467 tracing::debug!(
1468 "vote utime: {:?}, current_vote: {}, now-utime:{:?}, election_timeout: {:?}",
1469 utime,
1470 current_vote,
1471 utime.map(|x| now - x),
1472 election_timeout,
1473 );
1474
1475 if utime > Some(now - election_timeout) {
1477 tracing::debug!("election timeout has not yet passed",);
1478 return;
1479 }
1480
1481 tracing::info!("election timeout passed, check if it is a voter for election");
1482 }
1483
1484 self.engine.reset_greater_log();
1486
1487 tracing::info!("do trigger election");
1488 self.engine.elect();
1489 }
1490
1491 #[tracing::instrument(level = "debug", skip_all)]
1492 fn handle_replication_progress(
1493 &mut self,
1494 target: C::NodeId,
1495 request_id: RequestId,
1496 result: Result<ReplicationResult<C>, String>,
1497 ) {
1498 tracing::debug!(
1499 target = display(&target),
1500 request_id = display(request_id),
1501 result = debug(&result),
1502 "handle_replication_progress"
1503 );
1504
1505 #[allow(clippy::collapsible_if)]
1506 if tracing::enabled!(Level::DEBUG) {
1507 if !self.replications.contains_key(&target) {
1508 tracing::warn!("leader has removed target: {}", target);
1509 };
1510 }
1511
1512 if self.engine.leader.is_some() {
1514 self.engine.replication_handler().update_progress(target, request_id, result);
1515 }
1516 }
1517
1518 fn does_vote_match(&self, sender_vote: &Vote<C::NodeId>, msg: impl Display) -> bool {
1521 let my_vote = if sender_vote.is_committed() {
1526 let l = self.engine.leader.as_ref();
1527 l.map(|x| x.vote.clone())
1528 } else {
1529 let candidate = self.engine.candidate_ref();
1531 candidate.map(|x| x.vote_ref().clone())
1532 };
1533
1534 if Some(sender_vote) != my_vote.as_ref() {
1535 tracing::warn!(
1536 "A message will be ignored because vote changed: msg sent by vote: {}; current my vote: {}; when ({})",
1537 sender_vote,
1538 my_vote.display(),
1539 msg
1540 );
1541 false
1542 } else {
1543 true
1544 }
1545 }
1546 fn does_replication_session_match(
1549 &self,
1550 session_id: &ReplicationSessionId<C::NodeId>,
1551 msg: impl Display + Copy,
1552 ) -> bool {
1553 if !self.does_vote_match(session_id.vote_ref(), msg) {
1554 return false;
1555 }
1556
1557 if &session_id.membership_log_id != self.engine.state.membership_state.effective().log_id() {
1558 tracing::warn!(
1559 "membership_log_id changed: msg sent by: {}; curr: {}; ignore when ({})",
1560 session_id.membership_log_id.summary(),
1561 self.engine.state.membership_state.effective().log_id().summary(),
1562 msg
1563 );
1564 return false;
1565 }
1566 true
1567 }
1568}
1569
1570impl<C, N, LS, SM> RaftRuntime<C> for RaftCore<C, N, LS, SM>
1571where
1572 C: RaftTypeConfig,
1573 N: RaftNetworkFactory<C>,
1574 LS: RaftLogStorage<C>,
1575 SM: RaftStateMachine<C>,
1576{
1577 async fn run_command(&mut self, cmd: Command<C>) -> Result<Option<Command<C>>, StorageError<C::NodeId>> {
1578 let condition = cmd.condition();
1579 tracing::debug!("condition: {:?}", condition);
1580
1581 if let Some(condition) = condition {
1582 match condition {
1583 Condition::LogFlushed { .. } => {
1584 todo!()
1585 }
1586 Condition::Applied { log_id } => {
1587 if self.engine.state.io_applied() < log_id.as_ref() {
1588 tracing::debug!(
1589 "log_id: {} has not yet applied, postpone cmd: {:?}",
1590 DisplayOption(log_id),
1591 cmd
1592 );
1593 return Ok(Some(cmd));
1594 }
1595 }
1596 Condition::StateMachineCommand { command_seq } => {
1597 if self.command_state.finished_sm_seq < *command_seq {
1598 tracing::debug!(
1599 "sm::Command({}) has not yet finished({}), postpone cmd: {:?}",
1600 command_seq,
1601 self.command_state.finished_sm_seq,
1602 cmd
1603 );
1604 return Ok(Some(cmd));
1605 }
1606 }
1607 }
1608 }
1609
1610 match cmd {
1611 Command::BecomeLeader => {
1612 debug_assert!(self.leader_data.is_none(), "can not become leader twice");
1613 self.leader_data = Some(LeaderData::new());
1614 }
1615 Command::QuitLeader => {
1616 self.leader_data = None;
1617 }
1618 Command::AppendInputEntries { vote, entries } => {
1619 let last_log_id = entries.last().unwrap().get_log_id().clone();
1620 tracing::debug!("AppendInputEntries: {}", DisplaySlice::<_>(&entries),);
1621
1622 self.append_to_log(entries, vote, last_log_id.clone()).await?;
1623
1624 if let Ok(mut lh) = self.engine.leader_handler() {
1627 lh.replication_handler().update_local_progress(Some(last_log_id));
1628 }
1629 }
1630 Command::SaveVote { vote } => {
1631 self.log_store.save_vote(&vote).await?;
1632 self.engine.state.io_state_mut().update_vote(vote.clone());
1633
1634 let _ = self.tx_notify.send(Notify::VoteResponse {
1635 target: self.id.clone(),
1636 resp: VoteResponse::new(vote.clone(), None, true),
1638 sender_vote: vote,
1639 });
1640 }
1641 Command::PurgeLog { upto } => {
1642 self.log_store.purge(upto.clone()).await?;
1643 self.engine.state.io_state_mut().update_purged(Some(upto));
1644 }
1645 Command::DeleteConflictLog { since } => {
1646 self.log_store.truncate(since.clone()).await?;
1647
1648 let removed = self.client_resp_channels.split_off(&since.index);
1650 if !removed.is_empty() {
1651 let leader_id = self.current_leader();
1652 let leader_node = self.get_leader_node(leader_id.clone());
1653
1654 #[allow(clippy::let_underscore_future)]
1656 let _ = C::spawn(async move {
1657 for (log_index, tx) in removed.into_iter() {
1658 tx.send(Err(ClientWriteError::ForwardToLeader(ForwardToLeader {
1659 leader_id: leader_id.clone(),
1660 leader_node: leader_node.clone(),
1661 })));
1662
1663 tracing::debug!("sent ForwardToLeader for log_index: {}", log_index,);
1664 }
1665 });
1666 }
1667 }
1668 Command::SendVote { vote_req } => {
1669 self.spawn_parallel_vote_requests(&vote_req).await;
1670 }
1671 Command::ReplicateCommitted { committed } => {
1672 for node in self.replications.values() {
1673 let _ = node.tx_repl.send(Replicate::Committed(committed.clone()));
1674 }
1675 }
1676 Command::Commit {
1677 seq,
1678 ref already_committed,
1679 ref upto,
1680 } => {
1681 self.log_store.save_committed(Some(upto.clone())).await?;
1682 self.apply_to_state_machine(seq, already_committed.next_index(), upto.index).await?;
1683 }
1684 Command::Replicate { req, target } => {
1685 let node = self.replications.get(&target).expect("replication to target node exists");
1686
1687 match req {
1688 Inflight::None => {
1689 let _ = node.tx_repl.send(Replicate::Heartbeat);
1690 }
1691 Inflight::Logs { id, log_id_range } => {
1692 let _ = node.tx_repl.send(Replicate::logs(RequestId::new_append_entries(id), log_id_range));
1693 }
1694 Inflight::Snapshot { id, last_log_id } => {
1695 node.tx_repl.send(Replicate::snapshot(RequestId::new_snapshot(id), last_log_id)).map_err(
1697 |_e| StorageIOError::read_snapshot(None, AnyError::error("replication channel closed")),
1698 )?;
1699 }
1700 }
1701 }
1702 Command::RebuildReplicationStreams { targets } => {
1703 self.remove_all_replication().await;
1704
1705 for (target, matching) in targets.iter() {
1706 let handle = self.spawn_replication_stream(target.clone(), matching.clone()).await;
1707 self.replications.insert(target.clone(), handle);
1708 }
1709 }
1710 Command::StateMachine { command } => {
1711 self.sm_handle.send(command).map_err(|_e| {
1713 StorageIOError::write_state_machine(AnyError::error("can not send to sm::Worker".to_string()))
1714 })?;
1715 }
1716 Command::Respond { resp: send, .. } => {
1717 send.send();
1718 }
1719 }
1720
1721 Ok(None)
1722 }
1723}