redhac/
quorum.rs

1use std::collections::HashMap;
2use std::env;
3use std::time::Duration;
4
5use crate::client::{RpcRequest, RECONNECT_TIMEOUT_UPPER};
6use crate::rpc::cache;
7use crate::server::CacheMap;
8use crate::{get_rand_between, rpc, CacheError, CacheReq};
9use chrono::Utc;
10use lazy_static::lazy_static;
11use tokio::sync::{oneshot, watch};
12use tokio::time;
13use tracing::{debug, error, info, warn};
14
15lazy_static! {
16    static ref ELECTION_TIMEOUT: u64 = {
17        let t = env::var("CACHE_ELECTION_TIMEOUT")
18            .unwrap_or_else(|_| String::from("2"))
19            .parse::<u64>()
20            .expect("Error parsing 'CACHE_ELECTION_TIMEOUT' to u64");
21        t * 1000
22    };
23}
24
25/// The `AckLevel` is a QoS indicator for HA cache modifying operations.
26#[derive(Debug, Clone, PartialEq, Eq)]
27pub enum AckLevel {
28    Quorum,
29    Once,
30    Leader,
31}
32
33impl AckLevel {
34    pub fn get_rpc_value(&self) -> rpc::cache::AckLevel {
35        match self {
36            AckLevel::Quorum => cache::AckLevel {
37                ack_level: Some(cache::ack_level::AckLevel::LevelQuorum(cache::Empty {})),
38            },
39            AckLevel::Once => cache::AckLevel {
40                ack_level: Some(cache::ack_level::AckLevel::LevelOnce(cache::Empty {})),
41            },
42            AckLevel::Leader => cache::AckLevel {
43                ack_level: Some(cache::ack_level::AckLevel::LevelLeader(cache::Empty {})),
44            },
45        }
46    }
47
48    pub fn from_rpc_value(value: Option<cache::AckLevel>) -> Self {
49        match value.unwrap().ack_level.unwrap() {
50            cache::ack_level::AckLevel::LevelQuorum(_) => Self::Quorum,
51            cache::ack_level::AckLevel::LevelOnce(_) => Self::Once,
52            cache::ack_level::AckLevel::LevelLeader(_) => Self::Leader,
53        }
54    }
55}
56
57/// Only used with `HA_MODE` == true
58#[derive(Debug, Clone, Default)]
59pub struct QuorumHealthState {
60    pub health: QuorumHealth,
61    pub state: QuorumState,
62    pub tx_leader: Option<flume::Sender<RpcRequest>>,
63    pub connected_hosts: usize,
64}
65
66impl QuorumHealthState {
67    pub fn is_quorum_good(&self) -> Result<(), CacheError> {
68        match self.health {
69            QuorumHealth::Good => Ok(()),
70            QuorumHealth::Degraded => Ok(()),
71            QuorumHealth::Bad => Err(CacheError {
72                error: "QuorumHealth::Bad - cannot operate the HA cache".to_string(),
73            }),
74        }
75    }
76}
77
78/// Indicator if the QuorumHealth is Good or Bad. Only used with `HA_MODE` == true
79#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
80pub enum QuorumHealth {
81    Good,
82    Degraded,
83    Bad,
84}
85
86impl Default for QuorumHealth {
87    fn default() -> Self {
88        Self::Bad
89    }
90}
91
92/// Returns a full health state. Only used with `HA_MODE` == true
93#[derive(Debug, Clone, PartialEq)]
94pub struct QuorumReport {
95    pub health: QuorumHealth,
96    pub state: QuorumState,
97    pub leader: Option<RegisteredLeader>,
98    pub hosts: Vec<RpcServer>,
99}
100
101/// If this host is a cache Leader, Follower, or in Undefined state
102#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
103pub enum QuorumState {
104    Leader,
105    LeaderDead,
106    LeaderSwitch,
107    LeaderTxAwait(String),
108    LeadershipRequested(i64),
109    Follower,
110    Undefined,
111    Retry,
112}
113
114impl Default for QuorumState {
115    fn default() -> Self {
116        Self::Undefined
117    }
118}
119
120/// Enum for the communication between all necessary component when `HA_MODE` == true.
121/// `Print` prints out information about the current quorum and connected clients / servers.
122#[derive(Debug)]
123pub(crate) enum QuorumReq {
124    UpdateServer {
125        server: RpcServer,
126    },
127    GetReport {
128        tx: oneshot::Sender<QuorumReport>,
129    },
130    LeaderInfo {
131        addr: String,
132        election_ts: i64,
133        has_quorum: bool,
134    },
135    LeaderReq {
136        req: cache::LeaderReq,
137        tx: oneshot::Sender<bool>,
138    },
139    LeaderReqAck {
140        addr: String,
141        election_ts: i64,
142    },
143    LeaderAck {
144        ack: cache::LeaderAck,
145    },
146    LeaderSwitch {
147        req: cache::LeaderSwitch,
148    },
149    LeaderSwitchDead {
150        req: cache::LeaderDead,
151    },
152    LeaderSwitchAck {
153        req: cache::LeaderSwitchAck,
154    },
155    LeaderSwitchPriority {
156        req: cache::LeaderSwitchPriority,
157    },
158    CheckElectionTimeout,
159    HostShutdown,
160    // dead code inside this lib, but may be used from the outside to trigger a status log print
161    #[allow(dead_code)]
162    Print,
163}
164
165/// The registered Leader inside the quorum handler
166#[derive(Debug, Clone, PartialEq)]
167pub enum RegisteredLeader {
168    Local(i64),
169    Remote(RpcServer),
170}
171
172/// Is used for the communication with the internal 'quorum_handler' if a new client connects
173/// to the server.
174#[derive(Debug, Clone)]
175pub struct RpcServer {
176    pub address: String,
177    pub state: RpcServerState,
178    pub tx: Option<flume::Sender<RpcRequest>>,
179    pub election_ts: i64,
180}
181
182impl PartialEq for RpcServer {
183    fn eq(&self, other: &Self) -> bool {
184        other.address == self.address
185    }
186}
187
188/// State for new cache clients if they are `Alive` or `Dead`
189#[derive(Debug, Clone, PartialEq, Eq)]
190pub enum RpcServerState {
191    Alive,
192    Dead,
193}
194
195/**
196Handles all 'quorum related things'.
197The 3 main cache functions [cache_get!](cache_get!), [cache_put](cache_put), [cache_del](cache_del)
198check back with this handler every time, if quorum is Good or Bad.
199Additionally, this handler can print out the current state of Quorum and connected clients.
200
201The most important thing it does is probably the cache invalidation, if the Quorum went from `Good`
202to `Bad`. Every value is deleted from the local cache, if the instance lost Quorum, to make sure
203no cached values are used which were probably invalidated somewhere else already.
204 */
205pub(crate) async fn quorum_handler(
206    tx_quorum: flume::Sender<QuorumReq>,
207    tx_watch: watch::Sender<Option<QuorumHealthState>>,
208    rx: flume::Receiver<QuorumReq>,
209    tx_remote: flume::Sender<RpcRequest>,
210    clients_count: usize,
211    cache_map: CacheMap,
212    whoami: String,
213) -> anyhow::Result<()> {
214    let quorum = (clients_count + 1) / 2;
215
216    let mut hosts = HashMap::with_capacity(clients_count);
217    let mut health = QuorumHealth::Bad;
218    let mut state = QuorumState::Undefined;
219    let mut health_state = QuorumHealthState {
220        health: QuorumHealth::Bad,
221        state: QuorumState::Undefined,
222        tx_leader: None,
223        connected_hosts: 0,
224    };
225    let mut leader: Option<RegisteredLeader> = None;
226    let mut ack_count = 0;
227
228    info!(
229        "HA Cache Cluster quorum handler is running. Quorum needed for a healthy cluster: {}",
230        quorum + 1
231    );
232
233    loop {
234        debug!("loop iteration in quorum_handler");
235
236        let req = rx.recv_async().await;
237        if req.is_err() {
238            warn!("Received 'None' in 'quorum_handler' - Exiting");
239            break;
240        }
241
242        match req.unwrap() {
243            // will be called if a client has established or lost the connection to a remote server
244            QuorumReq::UpdateServer { server } => {
245                match server.state {
246                    RpcServerState::Alive => {
247                        info!("Cache client connected successfully: {}", server.address);
248
249                        if let QuorumState::LeaderTxAwait(addr) = &state {
250                            if addr == &server.address {
251                                let election_ts = leader
252                                    .map(|l| match l {
253                                        RegisteredLeader::Local(ts) => ts,
254                                        RegisteredLeader::Remote(r) => r.election_ts,
255                                    })
256                                    .unwrap();
257                                let ack = RpcRequest::LeaderReqAck {
258                                    addr: addr.clone(),
259                                    election_ts,
260                                };
261                                if let Some(tx) = &server.tx {
262                                    if let Err(err) = tx.send_async(ack).await {
263                                        error!("{:?}", CacheError::from(&err));
264                                    }
265                                }
266
267                                leader = Some(RegisteredLeader::Remote(RpcServer {
268                                    address: addr.clone(),
269                                    state: RpcServerState::Alive,
270                                    tx: server.tx.clone(),
271                                    election_ts,
272                                }));
273
274                                state = QuorumState::Follower;
275                                health_state.state = QuorumState::Follower;
276                                health_state.tx_leader.clone_from(&server.tx);
277                            }
278                        }
279
280                        if state == QuorumState::Retry && leader.is_some() {
281                            let lead = leader.clone().unwrap();
282                            match lead {
283                                RegisteredLeader::Local(_) => {
284                                    unreachable!(
285                                        "QuorumReq::UpdateServer -> state == QuorumState::\
286                                    Retry && leader.is_some() -> RegisteredLeader::Local"
287                                    )
288                                }
289                                RegisteredLeader::Remote(l) => {
290                                    if l.address == server.address {
291                                        leader = Some(RegisteredLeader::Remote(RpcServer {
292                                            address: server.address.clone(),
293                                            state: RpcServerState::Alive,
294                                            tx: server.tx.clone(),
295                                            election_ts: l.election_ts,
296                                        }));
297
298                                        state = QuorumState::Follower;
299                                        health_state.state = QuorumState::Follower;
300                                        health_state.tx_leader.clone_from(&server.tx);
301
302                                        tx_remote
303                                            .send_async(RpcRequest::LeaderReqAck {
304                                                addr: l.address,
305                                                election_ts: l.election_ts,
306                                            })
307                                            .await
308                                            .unwrap();
309                                    }
310                                }
311                            }
312                        }
313
314                        hosts.insert(server.address.clone(), server);
315
316                        let count = hosts.len();
317                        if count >= quorum {
318                            match health {
319                                QuorumHealth::Good => {}
320                                QuorumHealth::Bad | QuorumHealth::Degraded => {
321                                    info!("QuorumHealth changed from 'Bad | Degraded' to 'Good'");
322
323                                    // If our quorum switched from bad -> good, send out a leadership
324                                    // request and set ourselves as leader directly, if we do not already
325                                    // have one.
326                                    if leader.is_none() {
327                                        debug!("No registered leader - requesting the leadership");
328                                        // safe to unwrap until `2262-04-11`
329                                        let election_ts = Utc::now().timestamp_nanos_opt().unwrap();
330                                        leader = Some(RegisteredLeader::Local(election_ts));
331                                        state = QuorumState::LeadershipRequested(election_ts);
332                                        health_state.state =
333                                            QuorumState::LeadershipRequested(election_ts);
334                                        health_state.tx_leader = None;
335
336                                        ack_count = 0;
337
338                                        if let Err(err) = tx_remote
339                                            .send_async(RpcRequest::LeaderReq {
340                                                addr: whoami.clone(),
341                                                election_ts,
342                                            })
343                                            .await
344                                        {
345                                            error!("{:?}", CacheError::from(&err));
346                                        }
347                                        election_timeout_handler(tx_quorum.clone(), false);
348                                    }
349                                }
350                            }
351
352                            health = if count == clients_count {
353                                QuorumHealth::Good
354                            } else {
355                                QuorumHealth::Degraded
356                            };
357                            health_state.health = health.clone();
358                        }
359                    }
360
361                    RpcServerState::Dead => {
362                        debug!("Lost connection with cache client: {}", server.address);
363
364                        hosts.remove(&server.address);
365
366                        let count = hosts.len();
367                        let mut leader_died = false;
368                        if let Some(l) = leader.as_ref() {
369                            match l {
370                                RegisteredLeader::Local(_) => {}
371                                RegisteredLeader::Remote(lead) => {
372                                    if lead.address == server.address && count >= quorum {
373                                        ack_count = 0;
374                                        leader_died = true;
375                                    }
376                                }
377                            }
378                        }
379
380                        if count >= quorum {
381                            health = QuorumHealth::Degraded;
382                            health_state.health = health.clone();
383                            warn!("QuorumHealth changed from 'Good' to 'Degraded'");
384
385                            if leader_died {
386                                // If this state is set, we have acked to switch to another leader already
387                                if state == QuorumState::LeaderDead {
388                                    continue;
389                                }
390
391                                // Short sleep to better avoid race conditions
392                                time::sleep(Duration::from_millis(get_rand_between(0, 100))).await;
393
394                                // safe to unwrap until `2262-04-11`
395                                let election_ts = Utc::now().timestamp_nanos_opt().unwrap();
396                                tx_remote
397                                    .send_async(RpcRequest::LeaderSwitchDead {
398                                        vote_host: whoami.clone(),
399                                        election_ts,
400                                    })
401                                    .await
402                                    .unwrap();
403
404                                leader = Some(RegisteredLeader::Local(election_ts));
405                                state = QuorumState::LeadershipRequested(election_ts);
406                                health_state.state = QuorumState::LeadershipRequested(election_ts);
407                                health_state.tx_leader = None;
408
409                                warn!("The current leader died - starting a new election");
410
411                                election_timeout_handler(tx_quorum.clone(), false);
412                            }
413                        } else {
414                            match health {
415                                QuorumHealth::Good | QuorumHealth::Degraded => {
416                                    warn!("QuorumHealth changed from 'Good | Degraded' to 'Bad' - resetting local cache");
417                                    leader = None;
418                                    health = QuorumHealth::Bad;
419                                    state = QuorumState::Undefined;
420                                    health_state.health = QuorumHealth::Bad;
421                                    health_state.state = QuorumState::Undefined;
422                                    health_state.tx_leader = None;
423
424                                    ack_count = 0;
425                                    for tx in cache_map.values() {
426                                        tx.send_async(CacheReq::Reset).await?;
427                                    }
428                                }
429                                QuorumHealth::Bad => {}
430                            }
431                        }
432                    }
433                }
434
435                health_state.connected_hosts = hosts.len();
436                log_cluster_info(&leader, &hosts, &whoami, &health, &state);
437            }
438
439            QuorumReq::GetReport { tx } => {
440                let mut hosts_vec = vec![];
441                for host in hosts.values() {
442                    hosts_vec.push(host.clone());
443                }
444                let report = QuorumReport {
445                    health: health.clone(),
446                    state: state.clone(),
447                    leader: leader.clone(),
448                    hosts: hosts_vec,
449                };
450
451                match tx.send(report) {
452                    Ok(_) => {}
453                    Err(err) => {
454                        error!("Error sending back QuorumReport value: {:?}", err);
455                    }
456                }
457            }
458
459            QuorumReq::LeaderInfo {
460                addr,
461                election_ts,
462                has_quorum,
463            } => {
464                debug!(
465                    "{} received QuorumReq::LeaderInfo: {:?} - quorum: {}",
466                    whoami, addr, has_quorum
467                );
468
469                check_leadership_conflict(
470                    &leader,
471                    &tx_quorum,
472                    election_ts,
473                    &addr,
474                    &whoami,
475                    &tx_remote,
476                )
477                .await;
478
479                if (has_quorum
480                    && (state == QuorumState::Undefined || state == QuorumState::Retry)
481                    && addr != whoami)
482                    || leader.is_none()
483                {
484                    match get_leader_tx(&hosts, &addr).await {
485                        None => {
486                            debug!("Received LeaderInfo with quorum, but have no TX yet");
487                            leader = Some(RegisteredLeader::Remote(RpcServer {
488                                address: addr.clone(),
489                                state: RpcServerState::Alive,
490                                tx: None,
491                                election_ts,
492                            }));
493                            state = QuorumState::LeaderTxAwait(addr.clone());
494                            health_state.state = QuorumState::LeaderTxAwait(addr);
495                        }
496                        Some(tx) => {
497                            let ack = RpcRequest::LeaderReqAck {
498                                addr: addr.clone(),
499                                election_ts,
500                            };
501                            if let Err(err) = tx.send_async(ack).await {
502                                error!("{:?}", CacheError::from(&err));
503                            }
504
505                            leader = Some(RegisteredLeader::Remote(RpcServer {
506                                address: addr,
507                                state: RpcServerState::Alive,
508                                tx: Some(tx.clone()),
509                                election_ts,
510                            }));
511                            state = QuorumState::Follower;
512                            health_state.state = QuorumState::Follower;
513                            health_state.tx_leader = Some(tx);
514                        }
515                    };
516
517                    log_cluster_info(&leader, &hosts, &whoami, &health, &state);
518                }
519            }
520
521            QuorumReq::LeaderReq { req, tx } => {
522                let mut accepted = false;
523
524                // If we received a LeaderReq while being in state LeadershipRequested, we will end
525                // up in a conflicting situation with multiple nodes requesting leadership.
526                // -> Check and resolve via request timestamp
527                let is_conflict = check_leadership_conflict(
528                    &leader,
529                    &tx_quorum,
530                    req.election_ts,
531                    &req.addr,
532                    &whoami,
533                    &tx_remote,
534                )
535                .await;
536
537                if !is_conflict {
538                    let tx_leader = get_leader_tx(&hosts, &req.addr).await;
539                    if tx_leader.is_some() {
540                        leader = Some(RegisteredLeader::Remote(RpcServer {
541                            address: req.addr,
542                            state: RpcServerState::Alive,
543                            tx: tx_leader.clone(),
544                            election_ts: req.election_ts,
545                        }));
546                        health_state.tx_leader = tx_leader;
547
548                        accepted = true;
549
550                        debug!(
551                            "Registered new leader in QuorumReq::LeaderReq: {:?}",
552                            leader
553                        );
554
555                        // set a timeout task to double check and possible reset, if we do not receive
556                        // a LeaderAck in time
557                        election_timeout_handler(tx_quorum.clone(), false);
558                    } else {
559                        debug!(
560                            "Received QuorumReq::LeaderReq for not yet connected client: {} - trying again",
561                            req.addr
562                        );
563
564                        leader = Some(RegisteredLeader::Remote(RpcServer {
565                            address: req.addr,
566                            state: RpcServerState::Alive,
567                            tx: None,
568                            election_ts: req.election_ts,
569                        }));
570
571                        state = QuorumState::Retry;
572                        health_state.state = QuorumState::Retry;
573                        health_state.tx_leader = None;
574
575                        // set a timeout task to double check and possible reset, if we do not receive
576                        // a LeaderAck in time
577                        election_timeout_handler(tx_quorum.clone(), true);
578                    }
579
580                    log_cluster_info(&leader, &hosts, &whoami, &health, &state);
581                };
582
583                match tx.send(accepted) {
584                    Ok(_) => {}
585                    Err(err) => {
586                        error!("Error sending back LeaderReq result: {:?}", err);
587                    }
588                }
589            }
590
591            QuorumReq::LeaderReqAck { addr, election_ts } => {
592                debug!("{} received QuorumReq::LeaderReqAck for {}", whoami, addr);
593
594                match &leader {
595                    None => {
596                        // If someone else ReqAcked another leader while we do not even have any, just
597                        // accept it right away.
598                        let tx_leader = get_leader_tx(&hosts, &addr).await;
599                        leader = Some(RegisteredLeader::Remote(RpcServer {
600                            address: addr.clone(),
601                            state: RpcServerState::Alive,
602                            tx: tx_leader.clone(),
603                            election_ts,
604                        }));
605                        state = QuorumState::LeaderTxAwait(addr.clone());
606                        health_state.state = QuorumState::LeaderTxAwait(addr.clone());
607                        health_state.tx_leader = tx_leader;
608
609                        tx_remote
610                            .send_async(RpcRequest::LeaderReqAck { addr, election_ts })
611                            .await
612                            .unwrap();
613
614                        election_timeout_handler(tx_quorum.clone(), false);
615                        log_cluster_info(&leader, &hosts, &whoami, &health, &state);
616                    }
617
618                    Some(lead) => {
619                        check_leadership_conflict(
620                            &leader,
621                            &tx_quorum,
622                            election_ts,
623                            &addr,
624                            &whoami,
625                            &tx_remote,
626                        )
627                        .await;
628
629                        match lead {
630                            RegisteredLeader::Local(ts) => {
631                                if addr == whoami {
632                                    ack_count += 1;
633                                    debug!(
634                                    "Received a LeaderReqAck from {} and increasing counter to {}",
635                                    addr, ack_count
636                                );
637
638                                    if ack_count >= quorum {
639                                        state = QuorumState::Leader;
640                                        health_state.state = QuorumState::Leader;
641                                        health_state.tx_leader = None;
642
643                                        if let Err(err) = tx_remote
644                                            .send_async(RpcRequest::LeaderAck {
645                                                addr: whoami.clone(),
646                                                election_ts: *ts,
647                                            })
648                                            .await
649                                        {
650                                            error!("{:?}", CacheError::from(&err));
651                                        }
652                                    }
653
654                                    log_cluster_info(&leader, &hosts, &whoami, &health, &state);
655                                } else {
656                                    debug!("Received a LeaderReqAck for remote Server {}", addr);
657                                }
658                            }
659                            RegisteredLeader::Remote(_) => {
660                                debug!("Received a LeaderReqAck for remote Server {}", addr);
661                            }
662                        }
663                    }
664                }
665            }
666
667            QuorumReq::LeaderAck { ack } => {
668                match get_leader_tx(&hosts, &ack.addr).await {
669                    None => {
670                        leader = Some(RegisteredLeader::Remote(RpcServer {
671                            address: ack.addr.clone(),
672                            state: RpcServerState::Alive,
673                            tx: None,
674                            election_ts: ack.election_ts,
675                        }));
676
677                        state = QuorumState::LeaderTxAwait(ack.addr.clone());
678                        health_state.state = QuorumState::LeaderTxAwait(ack.addr);
679                        health_state.tx_leader = None;
680                    }
681                    Some(tx) => {
682                        leader = Some(RegisteredLeader::Remote(RpcServer {
683                            address: ack.addr,
684                            state: RpcServerState::Alive,
685                            tx: Some(tx.clone()),
686                            election_ts: ack.election_ts,
687                        }));
688
689                        state = QuorumState::Follower;
690                        health_state.state = QuorumState::Follower;
691                        health_state.tx_leader = Some(tx);
692                    }
693                }
694
695                // reset any possibly build up ack_count from before -> we have a remote leader now
696                ack_count = 0;
697
698                log_cluster_info(&leader, &hosts, &whoami, &health, &state);
699            }
700
701            QuorumReq::LeaderSwitch { req } => {
702                debug!("QuorumReq::LeaderSwitch to: {}", req.vote_host);
703
704                let addr = req.vote_host.clone();
705
706                if whoami == req.vote_host {
707                    info!("Taking over the leadership after QuorumReq::LeaderSwitch");
708                    ack_count = 0;
709                    leader = Some(RegisteredLeader::Local(req.election_ts));
710                    health_state.state = QuorumState::LeaderSwitch;
711                    health_state.tx_leader = None;
712                } else {
713                    let tx_leader = get_leader_tx(&hosts, &req.vote_host).await;
714                    leader = Some(RegisteredLeader::Remote(RpcServer {
715                        address: req.vote_host,
716                        state: RpcServerState::Alive,
717                        tx: tx_leader.clone(),
718                        election_ts: req.election_ts,
719                    }));
720                    health_state.state = QuorumState::LeaderSwitch;
721                    health_state.tx_leader = tx_leader;
722                }
723
724                state = QuorumState::LeaderSwitch;
725
726                if let Err(err) = tx_remote
727                    .send_async(RpcRequest::LeaderSwitchAck { addr })
728                    .await
729                {
730                    error!("{:?}", CacheError::from(&err));
731                }
732
733                election_timeout_handler(tx_quorum.clone(), false);
734                log_cluster_info(&leader, &hosts, &whoami, &health, &state);
735            }
736
737            QuorumReq::LeaderSwitchDead { req } => {
738                debug!("QuorumReq::LeaderSwitchDead to: {}", req.vote_host);
739
740                // If we get a LeaderSwitchDead, we assume, that a single host just crashed
741                let tx_leader = get_leader_tx(&hosts, &req.vote_host).await;
742                leader = Some(RegisteredLeader::Remote(RpcServer {
743                    address: req.vote_host.clone(),
744                    state: RpcServerState::Alive,
745                    tx: tx_leader.clone(),
746                    election_ts: req.election_ts,
747                }));
748                state = QuorumState::LeaderDead;
749                health_state.state = QuorumState::LeaderDead;
750                health_state.tx_leader = tx_leader;
751
752                tx_remote
753                    .send_async(RpcRequest::LeaderReqAck {
754                        addr: req.vote_host,
755                        election_ts: req.election_ts,
756                    })
757                    .await
758                    .unwrap();
759
760                election_timeout_handler(tx_quorum.clone(), false);
761                log_cluster_info(&leader, &hosts, &whoami, &health, &state);
762            }
763
764            // This is almost the same code as LeaderReqAck but is kept separate for better debugging
765            QuorumReq::LeaderSwitchAck { req } => match &leader {
766                None => unreachable!(),
767                Some(lead) => match lead {
768                    RegisteredLeader::Local(election_ts) => {
769                        if req.addr == whoami {
770                            ack_count += 1;
771                            debug!(
772                                "Received a LeaderSwitchAck: {:?}, and increasing counter to {}",
773                                req, ack_count
774                            );
775
776                            if ack_count >= quorum {
777                                state = QuorumState::Leader;
778                                health_state.state = QuorumState::Leader;
779                                if let Err(err) = tx_remote
780                                    .send_async(RpcRequest::LeaderAck {
781                                        addr: whoami.clone(),
782                                        election_ts: *election_ts,
783                                    })
784                                    .await
785                                {
786                                    error!("{:?}", CacheError::from(&err));
787                                }
788                            }
789
790                            log_cluster_info(&leader, &hosts, &whoami, &health, &state);
791                        } else {
792                            debug!("Received a LeaderSwitchAck: {:?}", req);
793                        }
794                    }
795                    RegisteredLeader::Remote(_) => {
796                        debug!("Received a LeaderSwitchAck to remote: {:?}", req);
797                    }
798                },
799            },
800
801            QuorumReq::LeaderSwitchPriority { req } => {
802                debug!("{}: QuorumReq::LeaderSwitchPriority to: {:?}", whoami, req);
803
804                let do_switch = match &state {
805                    QuorumState::Leader => {
806                        // if we are the leader, compare priority
807                        if let Some(RegisteredLeader::Local(ts)) = &leader {
808                            req.election_ts < *ts
809                        } else {
810                            unreachable!("If we are the leader, leader can only be Some(RegisteredLeader::Local(_))")
811                        }
812                    }
813                    QuorumState::LeaderDead => true,
814                    QuorumState::LeaderSwitch => true,
815                    QuorumState::LeaderTxAwait(addr) => {
816                        // If we are waiting for another leader TX than this switch request,
817                        // compare priorities
818                        if &req.vote_host != addr {
819                            if let Some(RegisteredLeader::Remote(lead)) = &leader {
820                                req.election_ts < lead.election_ts
821                            } else {
822                                unreachable!("If we await a leader tx, leader can only be Some(RegisteredLeader::Remote(_))")
823                            }
824                        } else {
825                            false
826                        }
827                    }
828                    QuorumState::LeadershipRequested(ts) => {
829                        // Only if the remote leader has the higher priority, we will do the switch
830                        // and ignore otherwise.
831                        req.election_ts < *ts
832                    }
833                    QuorumState::Follower => {
834                        if let Some(RegisteredLeader::Remote(lead)) = &leader {
835                            // If we are unlucky, a remote host might already be set as a 'real'
836                            // leader while still waiting for other incoming leadership requests.
837                            req.election_ts < lead.election_ts
838                        } else {
839                            unreachable!("If we are a follower, leader can only be Some(RegisteredLeader::Remote(_))")
840                        }
841                    }
842                    QuorumState::Undefined => true,
843                    QuorumState::Retry => true,
844                };
845
846                // let do_switch = if let &QuorumState::LeadershipRequested(ts) = &state {
847                //     // Only if the remote leader has the higher priority, we will do the switch
848                //     // and ignore otherwise.
849                //     req.election_ts < ts
850                // } else if let Some(RegisteredLeader::Local(ts)) = &leader {
851                //     // If we get unlucky or have network latency, this host might already be set as
852                //     // a 'real' leader while still waiting for other incoming leadership requests.
853                //     // Even in that case, we should handle it properly like above and only switch,
854                //     // if the other host has the higher priority.
855                //     req.election_ts < *ts
856                // } else if let Some(RegisteredLeader::Remote(lead)) = &leader {
857                //     // Again, if we get unlucky, a remote host might already be set as a 'real'
858                //     // leader while still waiting for other incoming leadership requests.
859                //     req.election_ts < lead.election_ts
860                // } else {
861                //     true
862                // };
863
864                if do_switch {
865                    if req.vote_host == whoami {
866                        leader = Some(RegisteredLeader::Local(req.election_ts));
867                        state = QuorumState::Leader;
868                        health_state.state = QuorumState::Leader;
869                        health_state.tx_leader = None;
870                    } else {
871                        let tx_leader = get_leader_tx(&hosts, &req.vote_host).await;
872                        leader = Some(RegisteredLeader::Remote(RpcServer {
873                            address: req.vote_host.clone(),
874                            state: RpcServerState::Alive,
875                            tx: tx_leader.clone(),
876                            election_ts: req.election_ts,
877                        }));
878                        state = if tx_leader.is_some() {
879                            QuorumState::Follower
880                        } else {
881                            QuorumState::LeaderTxAwait(req.vote_host.clone())
882                        };
883                        health_state.state = state.clone();
884                        health_state.tx_leader = tx_leader;
885
886                        tx_remote
887                            .send_async(RpcRequest::LeaderReqAck {
888                                addr: req.vote_host,
889                                election_ts: req.election_ts,
890                            })
891                            .await
892                            .unwrap();
893
894                        // We only need to check again after the timeout, if we do not already have the
895                        // leader tx
896                        if state == QuorumState::LeaderDead {
897                            election_timeout_handler(tx_quorum.clone(), false);
898                        }
899                    }
900                } else {
901                    debug!(
902                        "Received a QuorumReq::LeaderSwitchPriority while still having a higher \
903                        priority - ignoring it."
904                    );
905                }
906
907                // if req.vote_host == whoami {
908                //     leader = Some(RegisteredLeader::Local(req.election_ts));
909                //     state = QuorumState::Leader;
910                //     health_state.state = QuorumState::Leader;
911                //     health_state.tx_leader = None;
912                // } else if do_switch {
913                //     let tx_leader = get_leader_tx(&hosts, &req.vote_host).await;
914                //     leader = Some(RegisteredLeader::Remote(RpcServer {
915                //         address: req.vote_host.clone(),
916                //         state: RpcServerState::Alive,
917                //         tx: tx_leader.clone(),
918                //         election_ts: req.election_ts,
919                //     }));
920                //     state = if tx_leader.is_some() {
921                //         QuorumState::Follower
922                //     } else {
923                //         QuorumState::LeaderTxAwait(req.vote_host.clone())
924                //     };
925                //     health_state.state = state.clone();
926                //     health_state.tx_leader = tx_leader;
927                //
928                //     tx_remote
929                //         .send_async(RpcRequest::LeaderReqAck {
930                //             addr: req.vote_host,
931                //             election_ts: req.election_ts,
932                //         })
933                //         .await
934                //         .unwrap();
935                //
936                //     // We only need to check again after the timeout, if we do not already have the
937                //     // leader tx
938                //     if state == QuorumState::LeaderDead {
939                //         election_timeout_handler(tx_quorum.clone(), false);
940                //     }
941                // } else {
942                //     debug!(
943                //         "Received a QuorumReq::LeaderSwitchPriority while still having a higher \
944                //         priority - ignoring it."
945                //     );
946                // }
947
948                log_cluster_info(&leader, &hosts, &whoami, &health, &state);
949            }
950
951            QuorumReq::CheckElectionTimeout => {
952                debug!("QuorumReq::CheckElectionTimeout");
953                if leader.is_none() {
954                    continue;
955                }
956
957                match state {
958                    QuorumState::Retry => {
959                        debug!("QuorumReq::CheckElectionTimeout -> QuorumState::Retry");
960
961                        // no matter what happens next, set to undefined state first to only retry once
962                        state = QuorumState::Undefined;
963
964                        match leader.clone().unwrap() {
965                            RegisteredLeader::Local(_) => unreachable!(),
966                            RegisteredLeader::Remote(l) => {
967                                // We get here, if we had a race condition on startup - this is the retry
968                                if let Some(tx) = get_leader_tx(&hosts, &l.address).await {
969                                    warn!("Retry after Cache Cluster Leader election timeout - this should not happen");
970
971                                    leader = Some(RegisteredLeader::Remote(RpcServer {
972                                        address: l.address.clone(),
973                                        state: RpcServerState::Alive,
974                                        tx: Some(tx.clone()),
975                                        election_ts: l.election_ts,
976                                    }));
977                                    state = QuorumState::Follower;
978                                    health_state.state = QuorumState::Follower;
979                                    health_state.tx_leader = Some(tx);
980
981                                    tx_remote
982                                        .send_async(RpcRequest::LeaderReqAck {
983                                            addr: l.address.clone(),
984                                            election_ts: l.election_ts,
985                                        })
986                                        .await
987                                        .unwrap();
988
989                                    election_timeout_handler(tx_quorum.clone(), false);
990                                    log_cluster_info(&leader, &hosts, &whoami, &health, &state);
991                                }
992                            }
993                        }
994                    }
995
996                    QuorumState::LeaderDead => match leader.as_ref().unwrap() {
997                        RegisteredLeader::Local(_) => {
998                            error!(
999                                "Received QuorumState::LeaderDead when Leader == \
1000                            RegisteredLeader::Local - this should never happen"
1001                            );
1002                        }
1003                        RegisteredLeader::Remote(_) => {
1004                            warn!("The Cache Cluster Leader died");
1005
1006                            if state == QuorumState::LeaderDead && hosts.len() >= quorum {
1007                                time::sleep(Duration::from_millis(get_rand_between(0, 100))).await;
1008                                info!("Quorum is still good after cache cluster leader died - requesting leadership");
1009
1010                                // safe to unwrap until `2262-04-11`
1011                                let election_ts = Utc::now().timestamp_nanos_opt().unwrap();
1012                                tx_remote
1013                                    .send_async(RpcRequest::LeaderSwitchDead {
1014                                        vote_host: whoami.clone(),
1015                                        election_ts,
1016                                    })
1017                                    .await
1018                                    .unwrap();
1019
1020                                leader = Some(RegisteredLeader::Local(election_ts));
1021                                state = QuorumState::LeadershipRequested(election_ts);
1022                                health_state.state = QuorumState::LeadershipRequested(election_ts);
1023                                health_state.tx_leader = None;
1024
1025                                election_timeout_handler(tx_quorum.clone(), false);
1026                                log_cluster_info(&leader, &hosts, &whoami, &health, &state);
1027                            }
1028                        }
1029                    },
1030
1031                    QuorumState::LeadershipRequested(ts) => {
1032                        warn!(
1033                            "New Leader request has still not been validated after {} s",
1034                            *ELECTION_TIMEOUT
1035                        );
1036
1037                        match health {
1038                            // If quorum is good, just re-send the leader req
1039                            QuorumHealth::Good | QuorumHealth::Degraded => {
1040                                if let Err(err) = tx_remote
1041                                    .send_async(RpcRequest::LeaderReq {
1042                                        addr: whoami.clone(),
1043                                        election_ts: ts,
1044                                    })
1045                                    .await
1046                                {
1047                                    error!("{:?}", CacheError::from(&err));
1048                                }
1049                                election_timeout_handler(tx_quorum.clone(), false);
1050                                log_cluster_info(&leader, &hosts, &whoami, &health, &state);
1051                            }
1052                            QuorumHealth::Bad => leader = None,
1053                        }
1054                    }
1055
1056                    QuorumState::LeaderSwitch => {
1057                        warn!(
1058                            "New LeaderSwitch request has still not been validated after {} s",
1059                            *ELECTION_TIMEOUT
1060                        );
1061
1062                        match health {
1063                            QuorumHealth::Good | QuorumHealth::Degraded => {
1064                                // safe to unwrap until `2262-04-11`
1065                                let election_ts = Utc::now().timestamp_nanos_opt().unwrap();
1066                                leader = Some(RegisteredLeader::Local(election_ts));
1067                                health_state.state = QuorumState::LeadershipRequested(election_ts);
1068                                health_state.tx_leader = None;
1069
1070                                if let Err(err) = tx_remote
1071                                    .send_async(RpcRequest::LeaderReq {
1072                                        addr: whoami.clone(),
1073                                        election_ts,
1074                                    })
1075                                    .await
1076                                {
1077                                    error!("{:?}", CacheError::from(&err));
1078                                }
1079                                election_timeout_handler(tx_quorum.clone(), false);
1080                                log_cluster_info(&leader, &hosts, &whoami, &health, &state);
1081                            }
1082                            QuorumHealth::Bad => leader = None,
1083                        }
1084                    }
1085
1086                    _ => {}
1087                }
1088            }
1089
1090            QuorumReq::HostShutdown => {
1091                if state == QuorumState::Leader {
1092                    info!("Current Cache Cluster Leader is shutting down");
1093
1094                    // TODO maybe make this a more sophisticated vote depending on smt like RTT or
1095                    // host resource usage
1096                    if let Some(addr) = hosts.keys().next() {
1097                        info!("Voting new leader after QuorumReq::HostShutdown: {}", addr);
1098
1099                        if let Err(err) = tx_remote
1100                            .send_async(RpcRequest::LeaderSwitch {
1101                                vote_host: addr.clone(),
1102                                // safe to unwrap until `2262-04-11`
1103                                election_ts: Utc::now().timestamp_nanos_opt().unwrap(),
1104                            })
1105                            .await
1106                        {
1107                            debug!("Error sending RpcRequest::LeaderLeader: {}", err);
1108                        }
1109                    }
1110                }
1111
1112                let _ = tx_remote.send_async(RpcRequest::Shutdown).await;
1113
1114                time::sleep(Duration::from_secs(1)).await;
1115                warn!("Exiting HA cache quorum handler");
1116                break;
1117            }
1118
1119            QuorumReq::Print => {
1120                log_cluster_info(&leader, &hosts, &whoami, &health, &state);
1121            }
1122        }
1123
1124        if let Err(err) = tx_watch.send(Some(QuorumHealthState {
1125            health: health.clone(),
1126            state: state.clone(),
1127            tx_leader: health_state.tx_leader.clone(),
1128            connected_hosts: hosts.len(),
1129        })) {
1130            debug!("'tx_watch' update error: {}", err);
1131        }
1132    }
1133
1134    Ok(())
1135}
1136
1137/// Checks for a conflicting situation with the leader election and sends out requests to solve any
1138/// possible conflicts. Returns `true`, if there was a conflict and `false` if not.
1139#[inline]
1140async fn check_leadership_conflict(
1141    curr_leader: &Option<RegisteredLeader>,
1142    tx_quorum: &flume::Sender<QuorumReq>,
1143    ts_remote: i64,
1144    addr_remote: &str,
1145    whoami: &str,
1146    tx_remote: &flume::Sender<RpcRequest>,
1147) -> bool {
1148    // Check, if we have a leader to compare against remote priority
1149    let res = if let Some(lead) = curr_leader {
1150        match lead {
1151            RegisteredLeader::Local(ts) => Some((ts, whoami)),
1152            RegisteredLeader::Remote(srv) => Some((&srv.election_ts, srv.address.as_str())),
1153        }
1154    } else {
1155        None
1156    };
1157
1158    if let Some((ts, addr)) = res {
1159        let req = if &ts_remote < ts {
1160            info!(
1161                "{} received new LeaderReq with higher priority - switching to {}",
1162                whoami, addr_remote
1163            );
1164
1165            // If we need to switch, we need to send the priority switch to ourselves, since the
1166            // other request would only go out to the other cache members by default.
1167            let _ = tx_quorum
1168                .send_async(QuorumReq::LeaderSwitchPriority {
1169                    req: cache::LeaderSwitchPriority {
1170                        vote_host: addr_remote.to_string(),
1171                        election_ts: ts_remote,
1172                    },
1173                })
1174                .await;
1175
1176            RpcRequest::LeaderSwitchPriority {
1177                vote_host: addr_remote.to_string(),
1178                election_ts: ts_remote,
1179            }
1180        } else {
1181            // To solve conflicts better, we send out a LeaderSwitch in this case to
1182            // also inform all others that we have the higher priority.
1183            // TODO should we keep doing this or does it even work without it too?
1184            debug!(
1185                "{} received new LeaderReq with lower priority - sending out switch requests",
1186                whoami
1187            );
1188
1189            RpcRequest::LeaderSwitchPriority {
1190                vote_host: addr.to_string(),
1191                election_ts: *ts,
1192            }
1193        };
1194
1195        if let Err(err) = tx_remote.send_async(req).await {
1196            error!("Error sending RpcRequest::LeaderLeader: {}", err);
1197        }
1198
1199        return true;
1200    }
1201    false
1202}
1203
1204#[inline]
1205async fn get_leader_tx(
1206    hosts: &HashMap<String, RpcServer>,
1207    addr: &str,
1208) -> Option<flume::Sender<RpcRequest>> {
1209    if let Some(host) = hosts.get(addr) {
1210        if host.tx.is_some() {
1211            return host.tx.clone();
1212        }
1213        warn!("Host TX is not available in QuorumReq::LeaderReq when it should be - sleeping 1 sec and trying again");
1214    }
1215    None
1216}
1217
1218/// This is a safety function to reset any new leader request if it was not validated with a quorum
1219/// after the given timeout.
1220fn election_timeout_handler(tx_quorum: flume::Sender<QuorumReq>, retry: bool) {
1221    let timeout = if retry {
1222        *RECONNECT_TIMEOUT_UPPER + 100
1223    } else {
1224        *ELECTION_TIMEOUT
1225    };
1226    tokio::spawn(async move {
1227        tokio::time::sleep(Duration::from_millis(timeout)).await;
1228        // This might fail if the cache was shut down in between, otherwise it will always succeed
1229        let _ = tx_quorum.send_async(QuorumReq::CheckElectionTimeout).await;
1230    });
1231}
1232
1233fn log_cluster_info(
1234    leader: &Option<RegisteredLeader>,
1235    clients: &HashMap<String, RpcServer>,
1236    whoami: &str,
1237    health: &QuorumHealth,
1238    state: &QuorumState,
1239) {
1240    let mut lead = if let Some(leader) = leader {
1241        match leader {
1242            RegisteredLeader::Local(_) => whoami,
1243            RegisteredLeader::Remote(l) => &l.address,
1244        }
1245    } else {
1246        "None"
1247    };
1248    if state == &QuorumState::Undefined || state == &QuorumState::Retry {
1249        lead = "Election in progress";
1250    }
1251
1252    // TODO optimize and get rid of the Vec
1253    let mut clients_list = Vec::with_capacity(clients.len());
1254    for c in clients {
1255        clients_list.push(c.0);
1256    }
1257
1258    debug!(
1259        r#"
1260
1261    Cluster Leader: {}
1262    This Host:      {}
1263    Host State:     {:?}
1264    Clients:        {:?}
1265    Quorum Health:  {:?}
1266    "#,
1267        lead, whoami, state, clients_list, health
1268    )
1269}