1use std::collections::HashMap;
2use std::env;
3use std::time::Duration;
4
5use crate::client::{RpcRequest, RECONNECT_TIMEOUT_UPPER};
6use crate::rpc::cache;
7use crate::server::CacheMap;
8use crate::{get_rand_between, rpc, CacheError, CacheReq};
9use chrono::Utc;
10use lazy_static::lazy_static;
11use tokio::sync::{oneshot, watch};
12use tokio::time;
13use tracing::{debug, error, info, warn};
14
15lazy_static! {
16 static ref ELECTION_TIMEOUT: u64 = {
17 let t = env::var("CACHE_ELECTION_TIMEOUT")
18 .unwrap_or_else(|_| String::from("2"))
19 .parse::<u64>()
20 .expect("Error parsing 'CACHE_ELECTION_TIMEOUT' to u64");
21 t * 1000
22 };
23}
24
25#[derive(Debug, Clone, PartialEq, Eq)]
27pub enum AckLevel {
28 Quorum,
29 Once,
30 Leader,
31}
32
33impl AckLevel {
34 pub fn get_rpc_value(&self) -> rpc::cache::AckLevel {
35 match self {
36 AckLevel::Quorum => cache::AckLevel {
37 ack_level: Some(cache::ack_level::AckLevel::LevelQuorum(cache::Empty {})),
38 },
39 AckLevel::Once => cache::AckLevel {
40 ack_level: Some(cache::ack_level::AckLevel::LevelOnce(cache::Empty {})),
41 },
42 AckLevel::Leader => cache::AckLevel {
43 ack_level: Some(cache::ack_level::AckLevel::LevelLeader(cache::Empty {})),
44 },
45 }
46 }
47
48 pub fn from_rpc_value(value: Option<cache::AckLevel>) -> Self {
49 match value.unwrap().ack_level.unwrap() {
50 cache::ack_level::AckLevel::LevelQuorum(_) => Self::Quorum,
51 cache::ack_level::AckLevel::LevelOnce(_) => Self::Once,
52 cache::ack_level::AckLevel::LevelLeader(_) => Self::Leader,
53 }
54 }
55}
56
57#[derive(Debug, Clone, Default)]
59pub struct QuorumHealthState {
60 pub health: QuorumHealth,
61 pub state: QuorumState,
62 pub tx_leader: Option<flume::Sender<RpcRequest>>,
63 pub connected_hosts: usize,
64}
65
66impl QuorumHealthState {
67 pub fn is_quorum_good(&self) -> Result<(), CacheError> {
68 match self.health {
69 QuorumHealth::Good => Ok(()),
70 QuorumHealth::Degraded => Ok(()),
71 QuorumHealth::Bad => Err(CacheError {
72 error: "QuorumHealth::Bad - cannot operate the HA cache".to_string(),
73 }),
74 }
75 }
76}
77
78#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
80pub enum QuorumHealth {
81 Good,
82 Degraded,
83 Bad,
84}
85
86impl Default for QuorumHealth {
87 fn default() -> Self {
88 Self::Bad
89 }
90}
91
92#[derive(Debug, Clone, PartialEq)]
94pub struct QuorumReport {
95 pub health: QuorumHealth,
96 pub state: QuorumState,
97 pub leader: Option<RegisteredLeader>,
98 pub hosts: Vec<RpcServer>,
99}
100
101#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
103pub enum QuorumState {
104 Leader,
105 LeaderDead,
106 LeaderSwitch,
107 LeaderTxAwait(String),
108 LeadershipRequested(i64),
109 Follower,
110 Undefined,
111 Retry,
112}
113
114impl Default for QuorumState {
115 fn default() -> Self {
116 Self::Undefined
117 }
118}
119
120#[derive(Debug)]
123pub(crate) enum QuorumReq {
124 UpdateServer {
125 server: RpcServer,
126 },
127 GetReport {
128 tx: oneshot::Sender<QuorumReport>,
129 },
130 LeaderInfo {
131 addr: String,
132 election_ts: i64,
133 has_quorum: bool,
134 },
135 LeaderReq {
136 req: cache::LeaderReq,
137 tx: oneshot::Sender<bool>,
138 },
139 LeaderReqAck {
140 addr: String,
141 election_ts: i64,
142 },
143 LeaderAck {
144 ack: cache::LeaderAck,
145 },
146 LeaderSwitch {
147 req: cache::LeaderSwitch,
148 },
149 LeaderSwitchDead {
150 req: cache::LeaderDead,
151 },
152 LeaderSwitchAck {
153 req: cache::LeaderSwitchAck,
154 },
155 LeaderSwitchPriority {
156 req: cache::LeaderSwitchPriority,
157 },
158 CheckElectionTimeout,
159 HostShutdown,
160 #[allow(dead_code)]
162 Print,
163}
164
165#[derive(Debug, Clone, PartialEq)]
167pub enum RegisteredLeader {
168 Local(i64),
169 Remote(RpcServer),
170}
171
172#[derive(Debug, Clone)]
175pub struct RpcServer {
176 pub address: String,
177 pub state: RpcServerState,
178 pub tx: Option<flume::Sender<RpcRequest>>,
179 pub election_ts: i64,
180}
181
182impl PartialEq for RpcServer {
183 fn eq(&self, other: &Self) -> bool {
184 other.address == self.address
185 }
186}
187
188#[derive(Debug, Clone, PartialEq, Eq)]
190pub enum RpcServerState {
191 Alive,
192 Dead,
193}
194
195pub(crate) async fn quorum_handler(
206 tx_quorum: flume::Sender<QuorumReq>,
207 tx_watch: watch::Sender<Option<QuorumHealthState>>,
208 rx: flume::Receiver<QuorumReq>,
209 tx_remote: flume::Sender<RpcRequest>,
210 clients_count: usize,
211 cache_map: CacheMap,
212 whoami: String,
213) -> anyhow::Result<()> {
214 let quorum = (clients_count + 1) / 2;
215
216 let mut hosts = HashMap::with_capacity(clients_count);
217 let mut health = QuorumHealth::Bad;
218 let mut state = QuorumState::Undefined;
219 let mut health_state = QuorumHealthState {
220 health: QuorumHealth::Bad,
221 state: QuorumState::Undefined,
222 tx_leader: None,
223 connected_hosts: 0,
224 };
225 let mut leader: Option<RegisteredLeader> = None;
226 let mut ack_count = 0;
227
228 info!(
229 "HA Cache Cluster quorum handler is running. Quorum needed for a healthy cluster: {}",
230 quorum + 1
231 );
232
233 loop {
234 debug!("loop iteration in quorum_handler");
235
236 let req = rx.recv_async().await;
237 if req.is_err() {
238 warn!("Received 'None' in 'quorum_handler' - Exiting");
239 break;
240 }
241
242 match req.unwrap() {
243 QuorumReq::UpdateServer { server } => {
245 match server.state {
246 RpcServerState::Alive => {
247 info!("Cache client connected successfully: {}", server.address);
248
249 if let QuorumState::LeaderTxAwait(addr) = &state {
250 if addr == &server.address {
251 let election_ts = leader
252 .map(|l| match l {
253 RegisteredLeader::Local(ts) => ts,
254 RegisteredLeader::Remote(r) => r.election_ts,
255 })
256 .unwrap();
257 let ack = RpcRequest::LeaderReqAck {
258 addr: addr.clone(),
259 election_ts,
260 };
261 if let Some(tx) = &server.tx {
262 if let Err(err) = tx.send_async(ack).await {
263 error!("{:?}", CacheError::from(&err));
264 }
265 }
266
267 leader = Some(RegisteredLeader::Remote(RpcServer {
268 address: addr.clone(),
269 state: RpcServerState::Alive,
270 tx: server.tx.clone(),
271 election_ts,
272 }));
273
274 state = QuorumState::Follower;
275 health_state.state = QuorumState::Follower;
276 health_state.tx_leader.clone_from(&server.tx);
277 }
278 }
279
280 if state == QuorumState::Retry && leader.is_some() {
281 let lead = leader.clone().unwrap();
282 match lead {
283 RegisteredLeader::Local(_) => {
284 unreachable!(
285 "QuorumReq::UpdateServer -> state == QuorumState::\
286 Retry && leader.is_some() -> RegisteredLeader::Local"
287 )
288 }
289 RegisteredLeader::Remote(l) => {
290 if l.address == server.address {
291 leader = Some(RegisteredLeader::Remote(RpcServer {
292 address: server.address.clone(),
293 state: RpcServerState::Alive,
294 tx: server.tx.clone(),
295 election_ts: l.election_ts,
296 }));
297
298 state = QuorumState::Follower;
299 health_state.state = QuorumState::Follower;
300 health_state.tx_leader.clone_from(&server.tx);
301
302 tx_remote
303 .send_async(RpcRequest::LeaderReqAck {
304 addr: l.address,
305 election_ts: l.election_ts,
306 })
307 .await
308 .unwrap();
309 }
310 }
311 }
312 }
313
314 hosts.insert(server.address.clone(), server);
315
316 let count = hosts.len();
317 if count >= quorum {
318 match health {
319 QuorumHealth::Good => {}
320 QuorumHealth::Bad | QuorumHealth::Degraded => {
321 info!("QuorumHealth changed from 'Bad | Degraded' to 'Good'");
322
323 if leader.is_none() {
327 debug!("No registered leader - requesting the leadership");
328 let election_ts = Utc::now().timestamp_nanos_opt().unwrap();
330 leader = Some(RegisteredLeader::Local(election_ts));
331 state = QuorumState::LeadershipRequested(election_ts);
332 health_state.state =
333 QuorumState::LeadershipRequested(election_ts);
334 health_state.tx_leader = None;
335
336 ack_count = 0;
337
338 if let Err(err) = tx_remote
339 .send_async(RpcRequest::LeaderReq {
340 addr: whoami.clone(),
341 election_ts,
342 })
343 .await
344 {
345 error!("{:?}", CacheError::from(&err));
346 }
347 election_timeout_handler(tx_quorum.clone(), false);
348 }
349 }
350 }
351
352 health = if count == clients_count {
353 QuorumHealth::Good
354 } else {
355 QuorumHealth::Degraded
356 };
357 health_state.health = health.clone();
358 }
359 }
360
361 RpcServerState::Dead => {
362 debug!("Lost connection with cache client: {}", server.address);
363
364 hosts.remove(&server.address);
365
366 let count = hosts.len();
367 let mut leader_died = false;
368 if let Some(l) = leader.as_ref() {
369 match l {
370 RegisteredLeader::Local(_) => {}
371 RegisteredLeader::Remote(lead) => {
372 if lead.address == server.address && count >= quorum {
373 ack_count = 0;
374 leader_died = true;
375 }
376 }
377 }
378 }
379
380 if count >= quorum {
381 health = QuorumHealth::Degraded;
382 health_state.health = health.clone();
383 warn!("QuorumHealth changed from 'Good' to 'Degraded'");
384
385 if leader_died {
386 if state == QuorumState::LeaderDead {
388 continue;
389 }
390
391 time::sleep(Duration::from_millis(get_rand_between(0, 100))).await;
393
394 let election_ts = Utc::now().timestamp_nanos_opt().unwrap();
396 tx_remote
397 .send_async(RpcRequest::LeaderSwitchDead {
398 vote_host: whoami.clone(),
399 election_ts,
400 })
401 .await
402 .unwrap();
403
404 leader = Some(RegisteredLeader::Local(election_ts));
405 state = QuorumState::LeadershipRequested(election_ts);
406 health_state.state = QuorumState::LeadershipRequested(election_ts);
407 health_state.tx_leader = None;
408
409 warn!("The current leader died - starting a new election");
410
411 election_timeout_handler(tx_quorum.clone(), false);
412 }
413 } else {
414 match health {
415 QuorumHealth::Good | QuorumHealth::Degraded => {
416 warn!("QuorumHealth changed from 'Good | Degraded' to 'Bad' - resetting local cache");
417 leader = None;
418 health = QuorumHealth::Bad;
419 state = QuorumState::Undefined;
420 health_state.health = QuorumHealth::Bad;
421 health_state.state = QuorumState::Undefined;
422 health_state.tx_leader = None;
423
424 ack_count = 0;
425 for tx in cache_map.values() {
426 tx.send_async(CacheReq::Reset).await?;
427 }
428 }
429 QuorumHealth::Bad => {}
430 }
431 }
432 }
433 }
434
435 health_state.connected_hosts = hosts.len();
436 log_cluster_info(&leader, &hosts, &whoami, &health, &state);
437 }
438
439 QuorumReq::GetReport { tx } => {
440 let mut hosts_vec = vec![];
441 for host in hosts.values() {
442 hosts_vec.push(host.clone());
443 }
444 let report = QuorumReport {
445 health: health.clone(),
446 state: state.clone(),
447 leader: leader.clone(),
448 hosts: hosts_vec,
449 };
450
451 match tx.send(report) {
452 Ok(_) => {}
453 Err(err) => {
454 error!("Error sending back QuorumReport value: {:?}", err);
455 }
456 }
457 }
458
459 QuorumReq::LeaderInfo {
460 addr,
461 election_ts,
462 has_quorum,
463 } => {
464 debug!(
465 "{} received QuorumReq::LeaderInfo: {:?} - quorum: {}",
466 whoami, addr, has_quorum
467 );
468
469 check_leadership_conflict(
470 &leader,
471 &tx_quorum,
472 election_ts,
473 &addr,
474 &whoami,
475 &tx_remote,
476 )
477 .await;
478
479 if (has_quorum
480 && (state == QuorumState::Undefined || state == QuorumState::Retry)
481 && addr != whoami)
482 || leader.is_none()
483 {
484 match get_leader_tx(&hosts, &addr).await {
485 None => {
486 debug!("Received LeaderInfo with quorum, but have no TX yet");
487 leader = Some(RegisteredLeader::Remote(RpcServer {
488 address: addr.clone(),
489 state: RpcServerState::Alive,
490 tx: None,
491 election_ts,
492 }));
493 state = QuorumState::LeaderTxAwait(addr.clone());
494 health_state.state = QuorumState::LeaderTxAwait(addr);
495 }
496 Some(tx) => {
497 let ack = RpcRequest::LeaderReqAck {
498 addr: addr.clone(),
499 election_ts,
500 };
501 if let Err(err) = tx.send_async(ack).await {
502 error!("{:?}", CacheError::from(&err));
503 }
504
505 leader = Some(RegisteredLeader::Remote(RpcServer {
506 address: addr,
507 state: RpcServerState::Alive,
508 tx: Some(tx.clone()),
509 election_ts,
510 }));
511 state = QuorumState::Follower;
512 health_state.state = QuorumState::Follower;
513 health_state.tx_leader = Some(tx);
514 }
515 };
516
517 log_cluster_info(&leader, &hosts, &whoami, &health, &state);
518 }
519 }
520
521 QuorumReq::LeaderReq { req, tx } => {
522 let mut accepted = false;
523
524 let is_conflict = check_leadership_conflict(
528 &leader,
529 &tx_quorum,
530 req.election_ts,
531 &req.addr,
532 &whoami,
533 &tx_remote,
534 )
535 .await;
536
537 if !is_conflict {
538 let tx_leader = get_leader_tx(&hosts, &req.addr).await;
539 if tx_leader.is_some() {
540 leader = Some(RegisteredLeader::Remote(RpcServer {
541 address: req.addr,
542 state: RpcServerState::Alive,
543 tx: tx_leader.clone(),
544 election_ts: req.election_ts,
545 }));
546 health_state.tx_leader = tx_leader;
547
548 accepted = true;
549
550 debug!(
551 "Registered new leader in QuorumReq::LeaderReq: {:?}",
552 leader
553 );
554
555 election_timeout_handler(tx_quorum.clone(), false);
558 } else {
559 debug!(
560 "Received QuorumReq::LeaderReq for not yet connected client: {} - trying again",
561 req.addr
562 );
563
564 leader = Some(RegisteredLeader::Remote(RpcServer {
565 address: req.addr,
566 state: RpcServerState::Alive,
567 tx: None,
568 election_ts: req.election_ts,
569 }));
570
571 state = QuorumState::Retry;
572 health_state.state = QuorumState::Retry;
573 health_state.tx_leader = None;
574
575 election_timeout_handler(tx_quorum.clone(), true);
578 }
579
580 log_cluster_info(&leader, &hosts, &whoami, &health, &state);
581 };
582
583 match tx.send(accepted) {
584 Ok(_) => {}
585 Err(err) => {
586 error!("Error sending back LeaderReq result: {:?}", err);
587 }
588 }
589 }
590
591 QuorumReq::LeaderReqAck { addr, election_ts } => {
592 debug!("{} received QuorumReq::LeaderReqAck for {}", whoami, addr);
593
594 match &leader {
595 None => {
596 let tx_leader = get_leader_tx(&hosts, &addr).await;
599 leader = Some(RegisteredLeader::Remote(RpcServer {
600 address: addr.clone(),
601 state: RpcServerState::Alive,
602 tx: tx_leader.clone(),
603 election_ts,
604 }));
605 state = QuorumState::LeaderTxAwait(addr.clone());
606 health_state.state = QuorumState::LeaderTxAwait(addr.clone());
607 health_state.tx_leader = tx_leader;
608
609 tx_remote
610 .send_async(RpcRequest::LeaderReqAck { addr, election_ts })
611 .await
612 .unwrap();
613
614 election_timeout_handler(tx_quorum.clone(), false);
615 log_cluster_info(&leader, &hosts, &whoami, &health, &state);
616 }
617
618 Some(lead) => {
619 check_leadership_conflict(
620 &leader,
621 &tx_quorum,
622 election_ts,
623 &addr,
624 &whoami,
625 &tx_remote,
626 )
627 .await;
628
629 match lead {
630 RegisteredLeader::Local(ts) => {
631 if addr == whoami {
632 ack_count += 1;
633 debug!(
634 "Received a LeaderReqAck from {} and increasing counter to {}",
635 addr, ack_count
636 );
637
638 if ack_count >= quorum {
639 state = QuorumState::Leader;
640 health_state.state = QuorumState::Leader;
641 health_state.tx_leader = None;
642
643 if let Err(err) = tx_remote
644 .send_async(RpcRequest::LeaderAck {
645 addr: whoami.clone(),
646 election_ts: *ts,
647 })
648 .await
649 {
650 error!("{:?}", CacheError::from(&err));
651 }
652 }
653
654 log_cluster_info(&leader, &hosts, &whoami, &health, &state);
655 } else {
656 debug!("Received a LeaderReqAck for remote Server {}", addr);
657 }
658 }
659 RegisteredLeader::Remote(_) => {
660 debug!("Received a LeaderReqAck for remote Server {}", addr);
661 }
662 }
663 }
664 }
665 }
666
667 QuorumReq::LeaderAck { ack } => {
668 match get_leader_tx(&hosts, &ack.addr).await {
669 None => {
670 leader = Some(RegisteredLeader::Remote(RpcServer {
671 address: ack.addr.clone(),
672 state: RpcServerState::Alive,
673 tx: None,
674 election_ts: ack.election_ts,
675 }));
676
677 state = QuorumState::LeaderTxAwait(ack.addr.clone());
678 health_state.state = QuorumState::LeaderTxAwait(ack.addr);
679 health_state.tx_leader = None;
680 }
681 Some(tx) => {
682 leader = Some(RegisteredLeader::Remote(RpcServer {
683 address: ack.addr,
684 state: RpcServerState::Alive,
685 tx: Some(tx.clone()),
686 election_ts: ack.election_ts,
687 }));
688
689 state = QuorumState::Follower;
690 health_state.state = QuorumState::Follower;
691 health_state.tx_leader = Some(tx);
692 }
693 }
694
695 ack_count = 0;
697
698 log_cluster_info(&leader, &hosts, &whoami, &health, &state);
699 }
700
701 QuorumReq::LeaderSwitch { req } => {
702 debug!("QuorumReq::LeaderSwitch to: {}", req.vote_host);
703
704 let addr = req.vote_host.clone();
705
706 if whoami == req.vote_host {
707 info!("Taking over the leadership after QuorumReq::LeaderSwitch");
708 ack_count = 0;
709 leader = Some(RegisteredLeader::Local(req.election_ts));
710 health_state.state = QuorumState::LeaderSwitch;
711 health_state.tx_leader = None;
712 } else {
713 let tx_leader = get_leader_tx(&hosts, &req.vote_host).await;
714 leader = Some(RegisteredLeader::Remote(RpcServer {
715 address: req.vote_host,
716 state: RpcServerState::Alive,
717 tx: tx_leader.clone(),
718 election_ts: req.election_ts,
719 }));
720 health_state.state = QuorumState::LeaderSwitch;
721 health_state.tx_leader = tx_leader;
722 }
723
724 state = QuorumState::LeaderSwitch;
725
726 if let Err(err) = tx_remote
727 .send_async(RpcRequest::LeaderSwitchAck { addr })
728 .await
729 {
730 error!("{:?}", CacheError::from(&err));
731 }
732
733 election_timeout_handler(tx_quorum.clone(), false);
734 log_cluster_info(&leader, &hosts, &whoami, &health, &state);
735 }
736
737 QuorumReq::LeaderSwitchDead { req } => {
738 debug!("QuorumReq::LeaderSwitchDead to: {}", req.vote_host);
739
740 let tx_leader = get_leader_tx(&hosts, &req.vote_host).await;
742 leader = Some(RegisteredLeader::Remote(RpcServer {
743 address: req.vote_host.clone(),
744 state: RpcServerState::Alive,
745 tx: tx_leader.clone(),
746 election_ts: req.election_ts,
747 }));
748 state = QuorumState::LeaderDead;
749 health_state.state = QuorumState::LeaderDead;
750 health_state.tx_leader = tx_leader;
751
752 tx_remote
753 .send_async(RpcRequest::LeaderReqAck {
754 addr: req.vote_host,
755 election_ts: req.election_ts,
756 })
757 .await
758 .unwrap();
759
760 election_timeout_handler(tx_quorum.clone(), false);
761 log_cluster_info(&leader, &hosts, &whoami, &health, &state);
762 }
763
764 QuorumReq::LeaderSwitchAck { req } => match &leader {
766 None => unreachable!(),
767 Some(lead) => match lead {
768 RegisteredLeader::Local(election_ts) => {
769 if req.addr == whoami {
770 ack_count += 1;
771 debug!(
772 "Received a LeaderSwitchAck: {:?}, and increasing counter to {}",
773 req, ack_count
774 );
775
776 if ack_count >= quorum {
777 state = QuorumState::Leader;
778 health_state.state = QuorumState::Leader;
779 if let Err(err) = tx_remote
780 .send_async(RpcRequest::LeaderAck {
781 addr: whoami.clone(),
782 election_ts: *election_ts,
783 })
784 .await
785 {
786 error!("{:?}", CacheError::from(&err));
787 }
788 }
789
790 log_cluster_info(&leader, &hosts, &whoami, &health, &state);
791 } else {
792 debug!("Received a LeaderSwitchAck: {:?}", req);
793 }
794 }
795 RegisteredLeader::Remote(_) => {
796 debug!("Received a LeaderSwitchAck to remote: {:?}", req);
797 }
798 },
799 },
800
801 QuorumReq::LeaderSwitchPriority { req } => {
802 debug!("{}: QuorumReq::LeaderSwitchPriority to: {:?}", whoami, req);
803
804 let do_switch = match &state {
805 QuorumState::Leader => {
806 if let Some(RegisteredLeader::Local(ts)) = &leader {
808 req.election_ts < *ts
809 } else {
810 unreachable!("If we are the leader, leader can only be Some(RegisteredLeader::Local(_))")
811 }
812 }
813 QuorumState::LeaderDead => true,
814 QuorumState::LeaderSwitch => true,
815 QuorumState::LeaderTxAwait(addr) => {
816 if &req.vote_host != addr {
819 if let Some(RegisteredLeader::Remote(lead)) = &leader {
820 req.election_ts < lead.election_ts
821 } else {
822 unreachable!("If we await a leader tx, leader can only be Some(RegisteredLeader::Remote(_))")
823 }
824 } else {
825 false
826 }
827 }
828 QuorumState::LeadershipRequested(ts) => {
829 req.election_ts < *ts
832 }
833 QuorumState::Follower => {
834 if let Some(RegisteredLeader::Remote(lead)) = &leader {
835 req.election_ts < lead.election_ts
838 } else {
839 unreachable!("If we are a follower, leader can only be Some(RegisteredLeader::Remote(_))")
840 }
841 }
842 QuorumState::Undefined => true,
843 QuorumState::Retry => true,
844 };
845
846 if do_switch {
865 if req.vote_host == whoami {
866 leader = Some(RegisteredLeader::Local(req.election_ts));
867 state = QuorumState::Leader;
868 health_state.state = QuorumState::Leader;
869 health_state.tx_leader = None;
870 } else {
871 let tx_leader = get_leader_tx(&hosts, &req.vote_host).await;
872 leader = Some(RegisteredLeader::Remote(RpcServer {
873 address: req.vote_host.clone(),
874 state: RpcServerState::Alive,
875 tx: tx_leader.clone(),
876 election_ts: req.election_ts,
877 }));
878 state = if tx_leader.is_some() {
879 QuorumState::Follower
880 } else {
881 QuorumState::LeaderTxAwait(req.vote_host.clone())
882 };
883 health_state.state = state.clone();
884 health_state.tx_leader = tx_leader;
885
886 tx_remote
887 .send_async(RpcRequest::LeaderReqAck {
888 addr: req.vote_host,
889 election_ts: req.election_ts,
890 })
891 .await
892 .unwrap();
893
894 if state == QuorumState::LeaderDead {
897 election_timeout_handler(tx_quorum.clone(), false);
898 }
899 }
900 } else {
901 debug!(
902 "Received a QuorumReq::LeaderSwitchPriority while still having a higher \
903 priority - ignoring it."
904 );
905 }
906
907 log_cluster_info(&leader, &hosts, &whoami, &health, &state);
949 }
950
951 QuorumReq::CheckElectionTimeout => {
952 debug!("QuorumReq::CheckElectionTimeout");
953 if leader.is_none() {
954 continue;
955 }
956
957 match state {
958 QuorumState::Retry => {
959 debug!("QuorumReq::CheckElectionTimeout -> QuorumState::Retry");
960
961 state = QuorumState::Undefined;
963
964 match leader.clone().unwrap() {
965 RegisteredLeader::Local(_) => unreachable!(),
966 RegisteredLeader::Remote(l) => {
967 if let Some(tx) = get_leader_tx(&hosts, &l.address).await {
969 warn!("Retry after Cache Cluster Leader election timeout - this should not happen");
970
971 leader = Some(RegisteredLeader::Remote(RpcServer {
972 address: l.address.clone(),
973 state: RpcServerState::Alive,
974 tx: Some(tx.clone()),
975 election_ts: l.election_ts,
976 }));
977 state = QuorumState::Follower;
978 health_state.state = QuorumState::Follower;
979 health_state.tx_leader = Some(tx);
980
981 tx_remote
982 .send_async(RpcRequest::LeaderReqAck {
983 addr: l.address.clone(),
984 election_ts: l.election_ts,
985 })
986 .await
987 .unwrap();
988
989 election_timeout_handler(tx_quorum.clone(), false);
990 log_cluster_info(&leader, &hosts, &whoami, &health, &state);
991 }
992 }
993 }
994 }
995
996 QuorumState::LeaderDead => match leader.as_ref().unwrap() {
997 RegisteredLeader::Local(_) => {
998 error!(
999 "Received QuorumState::LeaderDead when Leader == \
1000 RegisteredLeader::Local - this should never happen"
1001 );
1002 }
1003 RegisteredLeader::Remote(_) => {
1004 warn!("The Cache Cluster Leader died");
1005
1006 if state == QuorumState::LeaderDead && hosts.len() >= quorum {
1007 time::sleep(Duration::from_millis(get_rand_between(0, 100))).await;
1008 info!("Quorum is still good after cache cluster leader died - requesting leadership");
1009
1010 let election_ts = Utc::now().timestamp_nanos_opt().unwrap();
1012 tx_remote
1013 .send_async(RpcRequest::LeaderSwitchDead {
1014 vote_host: whoami.clone(),
1015 election_ts,
1016 })
1017 .await
1018 .unwrap();
1019
1020 leader = Some(RegisteredLeader::Local(election_ts));
1021 state = QuorumState::LeadershipRequested(election_ts);
1022 health_state.state = QuorumState::LeadershipRequested(election_ts);
1023 health_state.tx_leader = None;
1024
1025 election_timeout_handler(tx_quorum.clone(), false);
1026 log_cluster_info(&leader, &hosts, &whoami, &health, &state);
1027 }
1028 }
1029 },
1030
1031 QuorumState::LeadershipRequested(ts) => {
1032 warn!(
1033 "New Leader request has still not been validated after {} s",
1034 *ELECTION_TIMEOUT
1035 );
1036
1037 match health {
1038 QuorumHealth::Good | QuorumHealth::Degraded => {
1040 if let Err(err) = tx_remote
1041 .send_async(RpcRequest::LeaderReq {
1042 addr: whoami.clone(),
1043 election_ts: ts,
1044 })
1045 .await
1046 {
1047 error!("{:?}", CacheError::from(&err));
1048 }
1049 election_timeout_handler(tx_quorum.clone(), false);
1050 log_cluster_info(&leader, &hosts, &whoami, &health, &state);
1051 }
1052 QuorumHealth::Bad => leader = None,
1053 }
1054 }
1055
1056 QuorumState::LeaderSwitch => {
1057 warn!(
1058 "New LeaderSwitch request has still not been validated after {} s",
1059 *ELECTION_TIMEOUT
1060 );
1061
1062 match health {
1063 QuorumHealth::Good | QuorumHealth::Degraded => {
1064 let election_ts = Utc::now().timestamp_nanos_opt().unwrap();
1066 leader = Some(RegisteredLeader::Local(election_ts));
1067 health_state.state = QuorumState::LeadershipRequested(election_ts);
1068 health_state.tx_leader = None;
1069
1070 if let Err(err) = tx_remote
1071 .send_async(RpcRequest::LeaderReq {
1072 addr: whoami.clone(),
1073 election_ts,
1074 })
1075 .await
1076 {
1077 error!("{:?}", CacheError::from(&err));
1078 }
1079 election_timeout_handler(tx_quorum.clone(), false);
1080 log_cluster_info(&leader, &hosts, &whoami, &health, &state);
1081 }
1082 QuorumHealth::Bad => leader = None,
1083 }
1084 }
1085
1086 _ => {}
1087 }
1088 }
1089
1090 QuorumReq::HostShutdown => {
1091 if state == QuorumState::Leader {
1092 info!("Current Cache Cluster Leader is shutting down");
1093
1094 if let Some(addr) = hosts.keys().next() {
1097 info!("Voting new leader after QuorumReq::HostShutdown: {}", addr);
1098
1099 if let Err(err) = tx_remote
1100 .send_async(RpcRequest::LeaderSwitch {
1101 vote_host: addr.clone(),
1102 election_ts: Utc::now().timestamp_nanos_opt().unwrap(),
1104 })
1105 .await
1106 {
1107 debug!("Error sending RpcRequest::LeaderLeader: {}", err);
1108 }
1109 }
1110 }
1111
1112 let _ = tx_remote.send_async(RpcRequest::Shutdown).await;
1113
1114 time::sleep(Duration::from_secs(1)).await;
1115 warn!("Exiting HA cache quorum handler");
1116 break;
1117 }
1118
1119 QuorumReq::Print => {
1120 log_cluster_info(&leader, &hosts, &whoami, &health, &state);
1121 }
1122 }
1123
1124 if let Err(err) = tx_watch.send(Some(QuorumHealthState {
1125 health: health.clone(),
1126 state: state.clone(),
1127 tx_leader: health_state.tx_leader.clone(),
1128 connected_hosts: hosts.len(),
1129 })) {
1130 debug!("'tx_watch' update error: {}", err);
1131 }
1132 }
1133
1134 Ok(())
1135}
1136
1137#[inline]
1140async fn check_leadership_conflict(
1141 curr_leader: &Option<RegisteredLeader>,
1142 tx_quorum: &flume::Sender<QuorumReq>,
1143 ts_remote: i64,
1144 addr_remote: &str,
1145 whoami: &str,
1146 tx_remote: &flume::Sender<RpcRequest>,
1147) -> bool {
1148 let res = if let Some(lead) = curr_leader {
1150 match lead {
1151 RegisteredLeader::Local(ts) => Some((ts, whoami)),
1152 RegisteredLeader::Remote(srv) => Some((&srv.election_ts, srv.address.as_str())),
1153 }
1154 } else {
1155 None
1156 };
1157
1158 if let Some((ts, addr)) = res {
1159 let req = if &ts_remote < ts {
1160 info!(
1161 "{} received new LeaderReq with higher priority - switching to {}",
1162 whoami, addr_remote
1163 );
1164
1165 let _ = tx_quorum
1168 .send_async(QuorumReq::LeaderSwitchPriority {
1169 req: cache::LeaderSwitchPriority {
1170 vote_host: addr_remote.to_string(),
1171 election_ts: ts_remote,
1172 },
1173 })
1174 .await;
1175
1176 RpcRequest::LeaderSwitchPriority {
1177 vote_host: addr_remote.to_string(),
1178 election_ts: ts_remote,
1179 }
1180 } else {
1181 debug!(
1185 "{} received new LeaderReq with lower priority - sending out switch requests",
1186 whoami
1187 );
1188
1189 RpcRequest::LeaderSwitchPriority {
1190 vote_host: addr.to_string(),
1191 election_ts: *ts,
1192 }
1193 };
1194
1195 if let Err(err) = tx_remote.send_async(req).await {
1196 error!("Error sending RpcRequest::LeaderLeader: {}", err);
1197 }
1198
1199 return true;
1200 }
1201 false
1202}
1203
1204#[inline]
1205async fn get_leader_tx(
1206 hosts: &HashMap<String, RpcServer>,
1207 addr: &str,
1208) -> Option<flume::Sender<RpcRequest>> {
1209 if let Some(host) = hosts.get(addr) {
1210 if host.tx.is_some() {
1211 return host.tx.clone();
1212 }
1213 warn!("Host TX is not available in QuorumReq::LeaderReq when it should be - sleeping 1 sec and trying again");
1214 }
1215 None
1216}
1217
1218fn election_timeout_handler(tx_quorum: flume::Sender<QuorumReq>, retry: bool) {
1221 let timeout = if retry {
1222 *RECONNECT_TIMEOUT_UPPER + 100
1223 } else {
1224 *ELECTION_TIMEOUT
1225 };
1226 tokio::spawn(async move {
1227 tokio::time::sleep(Duration::from_millis(timeout)).await;
1228 let _ = tx_quorum.send_async(QuorumReq::CheckElectionTimeout).await;
1230 });
1231}
1232
1233fn log_cluster_info(
1234 leader: &Option<RegisteredLeader>,
1235 clients: &HashMap<String, RpcServer>,
1236 whoami: &str,
1237 health: &QuorumHealth,
1238 state: &QuorumState,
1239) {
1240 let mut lead = if let Some(leader) = leader {
1241 match leader {
1242 RegisteredLeader::Local(_) => whoami,
1243 RegisteredLeader::Remote(l) => &l.address,
1244 }
1245 } else {
1246 "None"
1247 };
1248 if state == &QuorumState::Undefined || state == &QuorumState::Retry {
1249 lead = "Election in progress";
1250 }
1251
1252 let mut clients_list = Vec::with_capacity(clients.len());
1254 for c in clients {
1255 clients_list.push(c.0);
1256 }
1257
1258 debug!(
1259 r#"
1260
1261 Cluster Leader: {}
1262 This Host: {}
1263 Host State: {:?}
1264 Clients: {:?}
1265 Quorum Health: {:?}
1266 "#,
1267 lead, whoami, state, clients_list, health
1268 )
1269}