dcs2_raft/server/
mod.rs

1use std::fmt::Debug;
2
3use log::{debug, error, info, trace};
4use rand::Rng;
5use rand_chacha::rand_core::SeedableRng;
6use serde::{Deserialize, Serialize};
7
8use dcs::communication::messages::{UpdateClusterVec, Header, Package, PackageBuilder};
9use dcs::communication::service::CommunicationService;
10use dcs::coordination::{CoordinationPackage, CoordinationService, Stopwatch};
11use dcs::heapless;
12use dcs::heapless::LinearMap;
13use dcs::nodes::SystemNodeId;
14use dcs::properties::CLUSTER_NODE_COUNT;
15use dcs::rules::measurements::{Measurement, SystemState};
16use dcs::rules::strategy::Rule;
17
18use crate::messages::RaftMessage::{AppendLog, ReadRequestReply, WriteRequestReply};
19use crate::messages::*;
20use crate::metadata::RaftMetadata;
21use crate::server::ElectionVote::Abstained;
22
23
24mod candidate;
25mod follower;
26mod leader;
27
28pub trait NoOp {
29    fn noop() -> Self;
30}
31
32pub trait Merge {
33    fn merge(self, rhs: Self) -> Self;
34}
35
36pub trait LogData:
37    Clone + Debug + NoOp + Merge + Serialize + From<(SystemNodeId, Measurement)> + Into<SystemState>
38{
39}
40
41impl<T> LogData for T where
42    T: Clone
43        + Debug
44        + NoOp
45        + Merge
46        + Serialize
47        + From<(SystemNodeId, Measurement)>
48        + Into<SystemState>
49{
50}
51
52struct IO<T: Stopwatch> {
53    timer: T,
54    timeout_secs: u64,
55}
56
57impl<T: Stopwatch> IO<T> {
58    fn to_millis(secs: u64) -> u64 {
59        secs * 1000
60    }
61
62    pub fn set_heartbeat_timeout(&mut self) {
63        self.timer = Stopwatch::from_millis(Self::to_millis(self.timeout_secs) / 5);
64    }
65
66    pub fn set_follower_timeout(&mut self) {
67        self.timer = Stopwatch::from_millis(Self::to_millis(self.timeout_secs));
68    }
69
70    pub fn set_candidate_timeout(&mut self) {
71        self.timer = Stopwatch::from_millis((Self::to_millis(self.timeout_secs) * 3) / 4);
72    }
73
74    pub fn get_leasing_time_secs(&mut self) -> u64 {
75        self.timeout_secs
76    }
77}
78
79#[derive(Copy, Clone, Eq, PartialEq, Debug)]
80pub enum MemberState {
81    Leader,
82    Follower,
83    Candidate,
84}
85
86trait LeaderBehavior<T: Stopwatch, L: LogData> {
87    fn parse_message(
88        &mut self,
89        package: RaftPackage<L>,
90        communication_service: &mut dyn CommunicationService<RaftPackage<L>>,
91    ) -> MemberState;
92
93    fn after_tick(&mut self, communication_service: &mut dyn CommunicationService<RaftPackage<L>>);
94}
95
96trait CandidateBehavior<T: Stopwatch, L: LogData> {
97    fn parse_message(
98        &mut self,
99        package: RaftPackage<L>,
100        communication_service: &mut dyn CommunicationService<RaftPackage<L>>,
101    ) -> MemberState;
102
103    fn after_tick(&mut self, communication_service: &mut dyn CommunicationService<RaftPackage<L>>);
104}
105
106trait FollowerBehavior<T: Stopwatch, L: LogData> {
107    fn parse_message(
108        &mut self,
109        package: RaftPackage<L>,
110        communication_service: &mut dyn CommunicationService<RaftPackage<L>>,
111    ) -> MemberState;
112
113    fn after_tick(&mut self, communication_service: &mut dyn CommunicationService<RaftPackage<L>>);
114}
115
116pub type RaftPackage<T> = Package<SystemNodeId, RaftMessage<T>>;
117
118/// This is the implementation of the [`CoordinationService`] for the raft algorithm.
119pub struct RaftService<T: Stopwatch, L: LogData> {
120    io: IO<T>,
121    term: Term,
122    id: SystemNodeId,
123    leader_id: Option<SystemNodeId>,
124    commit_index: LogIndex,
125    last_applied: LogIndex,
126    current_role: MemberState,
127    log: Log<L>,
128    cluster: LinearMap<SystemNodeId, ClusterMember, CLUSTER_NODE_COUNT>,
129    pub members_already_setup: bool,
130    next_config: Option<UpdateClusterVec>,
131}
132
133impl<T: Stopwatch, L: LogData>
134    CoordinationService<
135        T,
136        RaftMessage<L>,
137        LinearMap<SystemNodeId, RaftMetadata, CLUSTER_NODE_COUNT>,
138        RaftMetadata,
139    > for RaftService<T, L>
140{
141    /// The `id` will identify this nodes and the `metadata` will be used to obtain the root timeout.
142    /// From that value the heartbeat timeout, the candidate timeout, and the follower timeout will be computed
143    /// as follows:
144    /// * The heartbeat: this is the time between empty [`RaftMessage::AppendLog`] are sent to the followers
145    ///is set to be 20% of the root timeout.
146    /// * The candidate timeout: this is the time candidates will wait for the [`RaftMessage::AppendLogResponse`] before
147    /// starting a new election. This is set to be the 75% of the root timeout
148    /// * Follower timeout: this is the time a follower will wait for heartbeats, this is, [`AppendLog`] messages.
149    ///  This is set to be equal to the root timeout.
150    fn new(id: SystemNodeId, metadata: RaftMetadata) -> Self {
151        debug!(
152            "Intializaing Raft Coordination Service. Node timeout: {:?}s",
153            metadata.timeout
154        );
155        let timer = Stopwatch::from_millis(metadata.timeout * 1000);
156        Self::new(timer, None, id.into(), metadata.timeout)
157    }
158
159    fn leader(&self) -> Option<SystemNodeId> {
160        self.leader_id
161    }
162
163    fn get_state(&self) -> SystemState {
164        debug!("Raft state: {:?}", self.log);
165        let mut state = SystemState::default();
166        for entry in self.log.iter().filter_map(|entry| entry.data.clone()) {
167            let state_entry : SystemState = entry.into();
168            state.extend(state_entry)
169        }
170        return state
171    }
172
173    fn get_current_rule(&self) -> Option<Rule> {
174        self.log.iter().filter_map(|entry| entry.rule).last()
175    }
176
177    fn update_rule(
178        &mut self,
179        communication_service: &mut dyn CommunicationService<Package<SystemNodeId, RaftMessage<L>>>,
180        new_rule: Rule
181    ) {
182        let message = RaftMessage::WriteRequest(WriteRequestArgs::with_rule(self.id, new_rule));
183        let package = package(message).from(self.id).to(self.id).build().unwrap();
184        self.process(communication_service, Some(package), None)
185    }
186
187    fn update_members(
188        &mut self,
189        communication_service: &mut dyn CommunicationService<RaftPackage<L>>,
190        new_config: UpdateClusterVec,
191    ) {
192        let message = RaftMessage::ConfigChange(new_config);
193        let package = package(message).from(self.id).to(self.id).build().unwrap();
194        self.process(communication_service, Some(package), None)
195    }
196
197    fn update_state(
198        &mut self,
199        communication_service: &mut dyn CommunicationService<RaftPackage<L>>,
200        measurement: Measurement,
201    ) {
202        if !self.is_leader() && self.leader_id.is_some() {
203            let measurement = RaftMessage::WriteRequest(WriteRequestArgs::with_measurement(self.id.into(), measurement));
204            let package = self
205                .build_message(measurement, self.leader_id.unwrap().into())
206                .unwrap();
207            communication_service.push(package);
208        } else {
209            let message = RaftMessage::WriteRequest(WriteRequestArgs::with_measurement(self.id, measurement));
210            let package = self.build_message(message, self.id).unwrap();
211            self.process(communication_service, Some(package), None)
212        }
213    }
214
215    fn process(
216        &mut self,
217        communication_service: &mut dyn CommunicationService<RaftPackage<L>>,
218        package: Option<CoordinationPackage<RaftMessage<L>>>,
219        members: LinearMap<SystemNodeId, RaftMetadata, CLUSTER_NODE_COUNT>,
220    ) {
221        self.process(communication_service, package, Some(members))
222    }
223}
224
225
226impl<T: Stopwatch, L: LogData> RaftService<T, L> {
227    pub(crate) fn new(
228        timer: T,
229        cluster: Option<LinearMap<SystemNodeId, ClusterMember, CLUSTER_NODE_COUNT>>,
230        id: SystemNodeId,
231        timeout_secs: u64,
232    ) -> Self {
233        RaftService {
234            id,
235            term: 0,
236            leader_id: None,
237            commit_index: 0,
238            last_applied: 0,
239            io: IO { timer, timeout_secs, },
240            log: Default::default(),
241            members_already_setup: cluster.is_some(),
242            current_role: MemberState::Follower,
243            cluster: cluster.unwrap_or_default(),
244            next_config: None,
245        }
246    }
247
248    pub fn is_leader(&self) -> bool {
249        self.current_role == MemberState::Leader
250    }
251
252    pub fn process(
253        &mut self,
254        communication_service: &mut dyn CommunicationService<RaftPackage<L>>,
255        message: Option<Package<SystemNodeId, RaftMessage<L>>>,
256        members: Option<LinearMap<SystemNodeId, RaftMetadata, CLUSTER_NODE_COUNT>>,
257    ) {
258        if !self.members_already_setup {
259            self.set_up_members_for_the_first_time(members);
260        }
261        if let Some(msg) = message.clone() {
262            if msg.header.from != msg.header.to {
263                debug!("[RAFT] ({}) ==|{}|==> ({})", msg.header.from, msg.body, msg.header.to);
264            }
265        }
266        if self.members_already_setup {
267            trace!("Current log status: {:?}", self.log);
268            self.io.timer.update();
269            if let Some(package) = message {
270                self.parse_message(package, communication_service);
271            } else {
272                trace!("No message received, ticking!");
273            }
274
275            self.after_tick(communication_service);
276        }
277    }
278
279    fn set_up_members_for_the_first_time(
280        &mut self,
281        members: Option<LinearMap<SystemNodeId, RaftMetadata, CLUSTER_NODE_COUNT>>,
282    ) {
283        if let Some(members) = members {
284            debug!("Setting up members for the first time, cluster: {:?}", members);
285            self.cluster = members
286                .iter()
287                .filter(|(&id, _)| id != self.id)
288                .map(|(&id, _metadata)| {
289                    (
290                        id,
291                        ClusterMember {
292                            id: id.into(),
293                            vote_granted: ElectionVote::Abstained,
294                            next_idx: 1,
295                            match_idx: 0,
296                            last_successful_heartbeat: 0,
297                        },
298                    )
299                })
300                .collect();
301            self.members_already_setup = true;
302        }
303    }
304
305    pub fn tick(&mut self, communication_service: &mut dyn CommunicationService<RaftPackage<L>>) {
306        let message = communication_service.pop();
307        self.process(communication_service, message, None)
308    }
309
310    fn parse_message(
311        &mut self,
312        package: RaftPackage<L>,
313        communication_service: &mut dyn CommunicationService<RaftPackage<L>>,
314    ) {
315        debug!("Processing package {:?} as {:?}.", package, self.current_role);
316
317        self.current_role = match self.current_role {
318            MemberState::Leader => (self as &mut dyn LeaderBehavior<T, L>)
319                .parse_message(package, communication_service),
320            MemberState::Follower => (self as &mut dyn FollowerBehavior<T, L>)
321                .parse_message(package, communication_service),
322            MemberState::Candidate => (self as &mut dyn CandidateBehavior<T, L>)
323                .parse_message(package, communication_service),
324        };
325    }
326
327    fn update_ttl(&mut self, header: &Header<SystemNodeId>) {
328        if let Some(node) = self.cluster.get_mut(&header.from) {
329            node.last_successful_heartbeat = self.io.timer.current_time_as_secs();
330        }
331    }
332
333    fn start_election(
334        &mut self,
335        communication_service: &mut dyn CommunicationService<RaftPackage<L>>,
336    ) {
337        self.term += 1;
338        self.current_role = MemberState::Candidate;
339        self.clean_state_from_previous_election();
340        self.send_election_messages(communication_service);
341        self.io.set_candidate_timeout();
342    }
343
344    fn clean_state_from_previous_election(&mut self) {
345        self.cluster
346            .values_mut()
347            .for_each(|member| member.vote_granted = Abstained);
348    }
349
350    fn send_election_messages(
351        &mut self,
352        communication_service: &mut dyn CommunicationService<RaftPackage<L>>,
353    ) {
354        info!("Node #{} starting leader election.", self.id);
355        debug!("Election started in cluster: {:?}.", self.cluster);
356        let cluster = &self.cluster;
357        let term = self.term;
358        let id = self.id;
359        cluster
360            .values()
361            .into_iter()
362            .filter(|member: &&ClusterMember| member.id != id)
363            .for_each(|member| {
364                let msg = RaftPackageBuilder::default()
365                    .from(self.id)
366                    .to(member.id)
367                    .with_message(RaftMessage::RequestVote(RequestVoteArgs {
368                        term,
369                        prev_log_index: self.get_last_log_index(),
370                        prev_log_term: self.get_last_log_term(),
371                    }))
372                    .build()
373                    .ok();
374                if let Some(pkg) = msg {
375                    communication_service.push(pkg)
376                }
377            });
378    }
379
380    pub(crate) fn build_message(
381        &self,
382        msg: RaftMessage<L>,
383        to: SystemNodeId,
384    ) -> Option<Package<SystemNodeId, RaftMessage<L>>> {
385        package(msg)
386            .from(self.id)
387            .to(to)
388            .build()
389            .ok()
390    }
391
392    pub(crate) fn broadcast_message(
393        &mut self,
394        message: RaftMessage<L>,
395        communication_service: &mut dyn CommunicationService<RaftPackage<L>>,
396    ) {
397        let my_id = self.id;
398        let member_ids = self
399            .cluster
400            .values()
401            .map(|m| m.id)
402            .filter(|member_id| member_id != &my_id);
403        for member in member_ids {
404            self.send_message_to(message.clone(), member, communication_service)
405        }
406    }
407
408    fn send_message_to(
409        &self,
410        message: RaftMessage<L>,
411        member: SystemNodeId,
412        communication_service: &mut dyn CommunicationService<RaftPackage<L>>,
413    ) {
414        if let Some(msg) = self.build_message(message, member.into()) {
415            communication_service.push(msg)
416        }
417    }
418
419    pub(crate) fn reset_next_index(&mut self) {
420        let next_log_idx = self.get_last_log_index() + 1;
421        self.cluster
422            .iter_mut()
423            .for_each(|(_id, member)| member.next_idx = next_log_idx)
424    }
425
426    pub(crate) fn reset_match_index(&mut self) {
427        self.cluster
428            .iter_mut()
429            .for_each(|(_id, member)| member.match_idx = 0)
430    }
431
432    pub(crate) fn commit_noop(
433        &mut self,
434        communication_service: &mut dyn CommunicationService<RaftPackage<L>>,
435    ) {
436        let prev_log_index = self.get_last_log_index();
437        let prev_log_term = self.get_last_log_term();
438        let noop: LogEntry<L> = LogEntry::with_data(self.term, Some(L::noop()));
439        let _ = self.log.push(noop.clone());
440        let mut entries = Log::new();
441        let _ = entries.push(noop);
442        self.broadcast_message(
443            AppendLog(AppendLogArgs {
444                term: self.term,
445                prev_log_index,
446                prev_log_term,
447                entries,
448                leader_commit: self.commit_index,
449            }),
450            communication_service,
451        );
452    }
453
454    pub fn reject_read_request(
455        &mut self,
456        header: Header<SystemNodeId>,
457        communication_service: &mut dyn CommunicationService<RaftPackage<L>>,
458    ) {
459        let reject_read_request = ReadRequestReply(ReadRequestReplyArgs::fail(self.leader_id));
460        communication_service.push(
461            self.build_message(reject_read_request, header.from)
462                .unwrap(),
463        )
464    }
465
466    pub fn reject_write_request(
467        &mut self,
468        header: Header<SystemNodeId>,
469        communication_service: &mut dyn CommunicationService<RaftPackage<L>>,
470    ) {
471        let msg = self
472            .build_message(WriteRequestReply(false, self.leader_id), header.from)
473            .unwrap();
474
475        communication_service.push(msg)
476    }
477
478    pub(crate) fn get_last_log_term(&self) -> Term {
479        if let Some(entry) = self.log.last() {
480            entry.term
481        } else {
482            0
483        }
484    }
485
486    pub(crate) fn get_last_log_index(&self) -> LogIndex {
487        self.log.last_index() as LogIndex
488    }
489
490    fn after_tick(&mut self, communication_service: &mut dyn CommunicationService<RaftPackage<L>>) {
491        if self.log.capacity() < 0.25 {
492            info!("Creating snapshot with commit-idx {}.", self.commit_index);
493            debug!("Log to snapshot: {:?}", self.log);
494            self.log.snapshot(self.commit_index);
495            debug!("Snapshot result: {:?}", self.log);
496        }
497        match self.current_role {
498            MemberState::Leader => (self as &mut dyn LeaderBehavior<T, L>).after_tick(communication_service),
499            MemberState::Follower => (self as &mut dyn FollowerBehavior<T, L>).after_tick(communication_service),
500            MemberState::Candidate => (self as &mut dyn CandidateBehavior<T, L>).after_tick(communication_service),
501        }
502    }
503
504    fn send_heartbeat_to_followers(
505        &mut self,
506        communication_service: &mut dyn CommunicationService<RaftPackage<L>>,
507    ) {
508        let message = self.make_heartbeat();
509        let my_id = self.id;
510        let heartbeat_ttl = self.io.get_leasing_time_secs() / 2;
511        let now = self.io.timer.current_time_as_secs();
512        let member_ids = self
513            .cluster
514            .values()
515            .filter(|m| now - m.last_successful_heartbeat > heartbeat_ttl)
516            .map(|m| m.id)
517            .filter(|member_id| member_id != &my_id);
518        for member in member_ids {
519            self.send_message_to(message.clone(), member, communication_service)
520        }
521    }
522
523    fn make_heartbeat(&mut self) -> RaftMessage<L> {
524        let prev_log_index = self.get_last_log_index();
525        let prev_log_term = self.get_last_log_term();
526        RaftMessage::AppendLog(AppendLogArgs {
527            term: self.term,
528            prev_log_index,
529            prev_log_term,
530            entries: Log::new(),
531            leader_commit: self.commit_index,
532        })
533    }
534
535    fn send_install_snapshot_to(
536        &self,
537        member: &ClusterMember,
538        communication_service: &mut dyn CommunicationService<RaftPackage<L>>,
539    ) {
540        let snapshot = self.log.get(1).unwrap();
541        let args = InstallSnapshotArgs {
542            term: self.term,
543            leader_id: self.id,
544            last_included_index: self.log.last_included_index(),
545            last_included_term: snapshot.term,
546            data: snapshot.clone(),
547        };
548        self.send_message_to(
549            RaftMessage::InstallSnapshot(args),
550            member.id,
551            communication_service,
552        );
553    }
554
555    fn send_next_entry_to(
556        &self,
557        member: &ClusterMember,
558        communication_service: &mut dyn CommunicationService<RaftPackage<L>>,
559    ) {
560        let (prev_log_index, prev_log_term) = if member.next_idx > 1 {
561            let idx = member.next_idx - 1;
562            let term = self
563                .log
564                .get(idx as LogIndex)
565                .map(|entry| entry.term)
566                .unwrap_or_default();
567            (idx as LogIndex, term)
568        } else {
569            (0, 0)
570        };
571        trace!("Send next entry to {member:?} with previous-log-idx {prev_log_index} and term {prev_log_term}");
572        if let Some(current_entry) = self.log.get(member.next_idx) {
573            let mut entries = Log::new();
574            let _ = entries.push(current_entry.clone());
575            let append_log_msg = AppendLog(AppendLogArgs {
576                term: self.term,
577                prev_log_index,
578                prev_log_term,
579                entries,
580                leader_commit: self.commit_index,
581            });
582            trace!("Sending next entry to {member:?} entry: {append_log_msg:?}");
583            self.send_message_to(append_log_msg, member.id, communication_service);
584        }
585    }
586
587    pub fn current_config(&self) -> UpdateClusterVec {
588        let mut current_config = UpdateClusterVec::from_iter(self.cluster.values().map(|v| v.id.into()));
589        current_config.push(self.id.into());
590        current_config.sort();
591        current_config
592    }
593
594    pub fn update_config(&mut self, new_cluster: &UpdateClusterVec) {
595        for member_id in new_cluster {
596            if !self.cluster.contains_key(&member_id) && *member_id != self.id {
597                let new_member = ClusterMember {
598                    id: *member_id,
599                    vote_granted: ElectionVote::Abstained,
600                    next_idx: 1,
601                    match_idx: 0,
602                    last_successful_heartbeat: 0,
603                };
604                debug!("Adding new member to cluster {new_member:?}");
605                if self.cluster.insert(*member_id, new_member).is_err() {
606                    error!("Couldn't insert node #{} into cluster data.", member_id)
607                }
608            }
609        }
610
611        let mut members_to_delete = heapless::Vec::<SystemNodeId, CLUSTER_NODE_COUNT>::new();
612        for system_id in self.cluster.keys() {
613            if !new_cluster.contains(&system_id) {
614                members_to_delete.push(*system_id);
615            }
616        }
617        members_to_delete.into_iter().for_each(|member| {
618            self.cluster.remove(&member);
619        });
620    }
621
622    pub fn update_commit_index(&mut self) {
623        let mut commit_indexes: Vec<LogIndex> = self
624            .cluster
625            .values()
626            .map(|member| member.next_idx.checked_sub(1).unwrap_or_default())
627            .collect();
628        commit_indexes.sort_by(|x, y| x.cmp(y).reverse());
629        let mut latest_entry_shared = commit_indexes.first().cloned();
630        for index in commit_indexes.iter() {
631            let mut count = commit_indexes
632                .iter()
633                .filter(|other_index| other_index <= &index)
634                .count();
635            if self.commit_index <= *index {
636                count += 1
637            }
638            if count >= (self.cluster.len() + 1) / 2 {
639                latest_entry_shared = Some(*index);
640                break;
641            }
642        }
643        if let Some(index) = latest_entry_shared {
644            if index >= self.commit_index {
645                self.commit_index = index;
646            }
647        }
648    }
649
650    pub fn replicate_entry(
651        &mut self,
652        entry: LogEntry<L>,
653        communication_service: &mut dyn CommunicationService<RaftPackage<L>>,
654    ) {
655        let mut entries = Log::new();
656        let _ = entries.push(entry);
657        let append_log_msg = RaftMessage::AppendLog(AppendLogArgs {
658            term: self.term,
659            prev_log_index: self.get_last_log_index(),
660            prev_log_term: self.get_last_log_term(),
661            entries,
662            leader_commit: self.commit_index,
663        });
664        self.broadcast_message(append_log_msg, communication_service);
665    }
666
667    fn become_follower_of(&mut self, leader_id: SystemNodeId, new_term: Term) {
668        self.io.set_follower_timeout();
669        self.term = new_term;
670        debug!("Updating leader_id to: {}", leader_id);
671        self.leader_id = Some(leader_id);
672    }
673}
674
675fn package<T: LogData>(msg: RaftMessage<T>) -> RaftPackageBuilder<T> {
676    RaftPackageBuilder::default().with_message(msg)
677}
678
679#[derive(Serialize, Deserialize, Copy, Clone, Eq, PartialEq, Debug)]
680pub enum ElectionVote {
681    Granted,
682    Against,
683    Abstained,
684}
685
686#[derive(Serialize, Deserialize, Copy, Clone, Eq, PartialEq, Debug)]
687pub struct ClusterMember {
688    pub id: SystemNodeId,
689    pub vote_granted: ElectionVote,
690    pub next_idx: LogIndex,
691    pub match_idx: LogIndex,
692    pub last_successful_heartbeat: u64,
693}
694
695#[cfg(test)]
696#[allow(non_snake_case)]
697mod test_utils {
698    extern crate log;
699    extern crate std;
700
701    use core::fmt::Debug;
702    use std::println;
703    use std::sync::atomic::{AtomicUsize, Ordering};
704    use std::sync::mpsc;
705    use std::sync::mpsc::{Receiver, Sender};
706
707    use serde::{Deserialize, Serialize};
708
709    use dcs::communication::connection::{InMsgQueue, OutMsgQueue, Readable, Writable};
710    use dcs::communication::messages::{PackageBuilder};
711    use dcs::communication::service::CommunicationService;
712    use dcs::coordination::Stopwatch;
713    use dcs::nodes::SystemNodeId;
714    use dcs::rules::measurements::{Measurement, SystemState};
715
716    use crate::messages::RaftMessage::RequestVote;
717    use crate::messages::{RaftMessage, WriteRequestArgs};
718    use crate::server::{package, LogData, Merge, NoOp, RaftPackage, RaftService};
719    use crate::{CANDIATE_ELECTION_TIMEOUT, ELECTION_TIMEOUT};
720
721    static CLOCK: AtomicUsize = AtomicUsize::new(0);
722
723    pub struct FakeTimer {
724        clock: u32,
725        timeout: u32,
726    }
727
728    impl FakeTimer {
729        fn update_timer(&mut self) {
730            CLOCK.fetch_add(1, Ordering::Relaxed);
731            self.clock += 1;
732            if self.timeout > 0 {
733                self.timeout -= 1;
734            }
735        }
736        pub fn new() -> Self
737        where
738            Self: Sized,
739        {
740            FakeTimer {
741                clock: 0,
742                timeout: 0,
743            }
744        }
745    }
746
747    impl Stopwatch for FakeTimer {
748        fn from_millis(secs: u64) -> Self {
749            Self {
750                clock: 0 as u32,
751                timeout: secs as u32,
752            }
753        }
754
755        fn restart(&mut self) {}
756
757        fn is_timeout(&self) -> bool {
758            self.timeout == 0
759        }
760
761        fn current_time_as_secs(&mut self) -> u64 {
762            CLOCK.load(Ordering::Relaxed) as u64
763        }
764
765        fn as_secs(&self) -> u64 { unreachable!() }
766
767        fn update(&mut self) {
768            self.update_timer()
769        }
770    }
771
772    #[derive(Copy, Clone, Eq, PartialEq, Debug, Default, Serialize, Deserialize)]
773    pub struct TestState(pub u32, pub u32);
774
775    impl From<(SystemNodeId, Measurement)> for TestState {
776        fn from((_, m): (SystemNodeId, Measurement)) -> Self {
777            TestState(m.value as u32, 0)
778        }
779    }
780
781    impl From<TestState> for SystemState {
782        fn from(_: TestState) -> Self {
783            unreachable!()
784        }
785    }
786
787    impl NoOp for TestState {
788        fn noop() -> Self {
789            Self(0, 0)
790        }
791    }
792
793    impl Merge for TestState {
794        fn merge(self, rhs: Self) -> Self {
795            Self(self.0 + rhs.0, self.1 + rhs.1)
796        }
797    }
798
799    pub fn assert_message_is_vote_request<T: LogData + Debug>(actual_msg: &RaftMessage<T>) {
800        match actual_msg {
801            RequestVote { .. } => success(),
802            _ => fail(),
803        }
804    }
805
806    pub fn fail() {
807        unreachable!()
808    }
809
810    pub fn success() {
811        assert!(true)
812    }
813
814    pub fn timeout_candidate<T: LogData + Debug>(
815        candidate: &mut RaftService<FakeTimer, T>,
816        communication_service: &mut dyn CommunicationService<RaftPackage<T>>,
817    ) {
818        tick_times(CANDIATE_ELECTION_TIMEOUT, candidate, communication_service);
819    }
820
821    pub fn almost_timeout_follower<T: LogData + Debug>(
822        candidate: &mut RaftService<FakeTimer, T>,
823        communication_service: &mut dyn CommunicationService<RaftPackage<T>>,
824    ) {
825        tick_times(ELECTION_TIMEOUT - 1, candidate, communication_service);
826    }
827
828    pub fn timeout_follower<T: LogData + Debug>(
829        candidate: &mut RaftService<FakeTimer, T>,
830        communication_service: &mut dyn CommunicationService<RaftPackage<T>>,
831    ) {
832        tick_times(ELECTION_TIMEOUT, candidate, communication_service);
833    }
834
835    fn tick_times<T: LogData + Debug>(
836        times: u64,
837        candidate: &mut RaftService<FakeTimer, T>,
838        communication_service: &mut dyn CommunicationService<RaftPackage<T>>,
839    ) {
840        for _i in 0..times {
841            candidate.tick(communication_service);
842        }
843    }
844
845    pub fn read_msg_sent_by_server<T: LogData + Debug>(
846        in_queue_test: &mut dyn InMsgQueue<RaftPackage<T>>,
847    ) -> RaftMessage<T> {
848        let package = in_queue_test
849            .pop()
850            .expect("Message wasn't received by server");
851        package.body
852    }
853
854    pub fn get_destination_of_msg<T: LogData + Debug>(
855        in_queue_test: &mut dyn InMsgQueue<RaftPackage<T>>,
856    ) -> SystemNodeId {
857        in_queue_test.pop().unwrap().header.to
858    }
859
860    pub struct FakeDcsMsgQueue;
861
862    impl FakeDcsMsgQueue {
863        pub fn build_pair() -> (
864            FakeDcsInMsgQueue<RaftPackage<TestState>>,
865            FakeDcsOutMsgQueue<RaftPackage<TestState>>,
866        ) {
867            let (tx, rx) = mpsc::channel::<RaftPackage<TestState>>();
868            (
869                FakeDcsInMsgQueue { in_channel: rx },
870                FakeDcsOutMsgQueue { out_channel: tx },
871            )
872        }
873    }
874
875    pub struct FakeDcsOutMsgQueue<T: Writable + Debug> {
876        out_channel: Sender<T>,
877    }
878
879    #[derive(Debug)]
880    pub struct FakeDcsInMsgQueue<T: Readable + Debug> {
881        in_channel: Receiver<T>,
882    }
883
884    impl<T: Writable + Send + Debug> OutMsgQueue<T> for FakeDcsOutMsgQueue<T> {
885        fn push(&mut self, msg: T) {
886            println!("PUSH {:?}", msg);
887            let _ = self.out_channel.send(msg);
888        }
889    }
890
891    impl<T: Readable + Debug> InMsgQueue<T> for FakeDcsInMsgQueue<T> {
892        fn pop(&mut self) -> Option<T> {
893            match self.in_channel.try_recv() {
894                Ok(msg) => {
895                    println!("POP {:?}", msg);
896                    Some(msg)
897                }
898                Err(_) => None,
899            }
900        }
901    }
902
903    pub struct FakeCommunicationService<'a> {
904        out_queue: &'a mut dyn OutMsgQueue<RaftPackage<TestState>>,
905        in_queue: &'a mut dyn InMsgQueue<RaftPackage<TestState>>,
906    }
907
908    impl<'a> FakeCommunicationService<'a> {
909        pub fn new(
910            out_queue: &'a mut dyn OutMsgQueue<RaftPackage<TestState>>,
911            in_queue: &'a mut dyn InMsgQueue<RaftPackage<TestState>>,
912        ) -> Self {
913            Self {
914                out_queue,
915                in_queue,
916            }
917        }
918    }
919
920    impl<'a> OutMsgQueue<RaftPackage<TestState>> for FakeCommunicationService<'a> {
921        fn push(&mut self, msg: RaftPackage<TestState>) {
922            self.out_queue.push(msg);
923        }
924    }
925
926    impl<'a> InMsgQueue<RaftPackage<TestState>> for FakeCommunicationService<'a> {
927        fn pop(&mut self) -> Option<RaftPackage<TestState>> {
928            self.in_queue.pop()
929        }
930    }
931
932    impl<'a> CommunicationService<RaftPackage<TestState>> for FakeCommunicationService<'a> {}
933
934    pub fn configure_read_only_request<T: LogData + Debug>(id: SystemNodeId) -> RaftPackage<T> {
935        package(RaftMessage::ReadRequest)
936            .from(0.into())
937            .to(id)
938            .build()
939            .unwrap()
940    }
941
942    pub fn send_read_request(
943        server: &mut RaftService<FakeTimer, TestState>,
944        out_queue_test: &mut dyn OutMsgQueue<RaftPackage<TestState>>,
945        communication_service: &mut dyn CommunicationService<RaftPackage<TestState>>,
946    ) {
947        let client_request = configure_read_only_request(server.id);
948        out_queue_test.push(client_request);
949        server.tick(communication_service);
950    }
951
952    pub fn send_write_request(
953        server: &mut RaftService<FakeTimer, TestState>,
954        out_queue_test: &mut dyn OutMsgQueue<RaftPackage<TestState>>,
955        data: Measurement,
956        communication_service: &mut dyn CommunicationService<RaftPackage<TestState>>,
957    ) {
958        let server_id = server.id;
959        let client_request = configure_write_request(server_id, data);
960        out_queue_test.push(client_request);
961        server.tick(communication_service);
962    }
963
964    pub fn configure_write_request(
965        id: SystemNodeId,
966        data: Measurement,
967    ) -> RaftPackage<TestState> {
968        let current_log = RaftMessage::WriteRequest(WriteRequestArgs::with_measurement(id, data));
969        package(current_log)
970            .from(2.into())
971            .to(id)
972            .build()
973            .unwrap()
974    }
975}
976
977#[cfg(test)]
978pub mod test_server_builder {
979    extern crate log;
980    extern crate std;
981
982    use std::borrow::BorrowMut;
983    use std::collections::HashMap;
984    use std::iter::FromFn;
985    use std::ops::DerefMut;
986
987    use dcs::communication::connection::{InMsgQueue, OutMsgQueue};
988    use dcs::communication::messages::PackageBuilder;
989    use dcs::communication::service::CommunicationService;
990    use dcs::coordination::Stopwatch;
991    use dcs::heapless;
992    use dcs::heapless::LinearMap;
993    use dcs::nodes::SystemNodeId;
994    use dcs::rules::measurements::Measurement;
995    use dcs::rules::measurements::ClusterType::TEMPERATURE;
996
997    use crate::messages::*;
998    use crate::server::test_utils::*;
999    use crate::server::ElectionVote::Abstained;
1000    use crate::server::*;
1001    use crate::server::{package, CLUSTER_NODE_COUNT};
1002    use crate::state::RaftState;
1003    use crate::RaftMessage::*;
1004    use crate::{RaftPackageBuilder, CANDIATE_ELECTION_TIMEOUT, ELECTION_TIMEOUT};
1005
1006    pub struct TestContext<'a> {
1007        in_queue_test: FakeDcsInMsgQueue<RaftPackage<TestState>>,
1008        out_queue_test: FakeDcsOutMsgQueue<RaftPackage<TestState>>,
1009        server: RaftService<FakeTimer, TestState>,
1010        cluster: LinearMap<SystemNodeId, ClusterMember, CLUSTER_NODE_COUNT>,
1011        comm_service: &'a mut FakeCommunicationService<'a>,
1012    }
1013
1014    impl<'a> TestContext<'a> {
1015        pub fn get_node_mut(&mut self) -> &mut RaftService<FakeTimer, TestState> {
1016            &mut self.server
1017        }
1018
1019        pub fn get_node(&self) -> &RaftService<FakeTimer, TestState> {
1020            &self.server
1021        }
1022        pub fn term(&self) -> Term {
1023            self.server.term
1024        }
1025        pub fn tick(&mut self) {
1026            let server = &mut self.server;
1027            let mut comm_service = self.comm_service.deref_mut();
1028            server.tick(comm_service)
1029        }
1030
1031        pub fn cluster_size(&self) -> usize {
1032            self.server.cluster.len() + 1
1033        }
1034
1035        pub fn cluster(&self) -> LinearMap<SystemNodeId, ClusterMember, CLUSTER_NODE_COUNT> {
1036            self.server.cluster.clone()
1037        }
1038
1039        pub fn id(&self) -> SystemNodeId {
1040            self.server.id
1041        }
1042
1043        pub fn commit_idx(&self) -> LogIndex {
1044            self.server.commit_index
1045        }
1046
1047        pub fn send_read_request(&mut self) {
1048            let server_id = self.server.id;
1049            let client_request = configure_read_only_request(server_id);
1050            self.send(client_request);
1051        }
1052
1053        pub fn send_read_request_and_tick(&mut self) {
1054            self.send_read_request();
1055            self.tick();
1056        }
1057
1058        pub fn send_write_request(&mut self, data: Measurement) {
1059            let server_id = self.server.id;
1060            let client_request = configure_write_request(server_id, data);
1061            self.send(client_request);
1062        }
1063
1064        pub fn send_write_request_and_tick(&mut self, data: Measurement) {
1065            self.send_write_request(data);
1066            self.tick();
1067        }
1068
1069        pub fn empty_queue(&mut self) {
1070            let _ = self.messages();
1071        }
1072
1073        pub fn messages(&mut self) -> std::vec::Vec<RaftPackage<TestState>> {
1074            let mut messages = vec![];
1075            let mut pkg = self.recv();
1076            while pkg.is_some() {
1077                messages.push(pkg.unwrap());
1078                pkg = self.recv();
1079            }
1080            messages
1081        }
1082
1083        pub fn append_log_requests(&mut self) -> Vec<AppendLogArgs<TestState>> {
1084            let mut messages = self.messages();
1085            let messages: Vec<_> = messages
1086                .into_iter()
1087                .filter_map(|pkg| {
1088                    if let RaftMessage::AppendLog(args) = pkg.body {
1089                        Some(args)
1090                    } else {
1091                        None
1092                    }
1093                })
1094                .collect();
1095            messages
1096        }
1097
1098        pub fn count_heartbeats(&mut self) -> usize {
1099            let queue = &mut self.in_queue_test;
1100            let iter = std::iter::from_fn(|| queue.pop());
1101            iter.map(|pkg| pkg.body)
1102                .filter(|msg| matches!(msg, RaftMessage::AppendLog(args)))
1103                .count()
1104        }
1105
1106        pub fn entry_was_broadcasted(&mut self, entry: LogEntry<TestState>) -> bool {
1107            let mut appendlog_messages: Vec<RaftPackage<TestState>> = self
1108                .messages()
1109                .into_iter()
1110                .filter(|pkg| matches!(pkg.get_message(), RaftMessage::AppendLog(args)))
1111                .collect();
1112            let mut expected_entries = Log::new();
1113            expected_entries.push(entry);
1114
1115            let mut count = 0;
1116            for member in self.cluster.keys() {
1117                let broadcasted = appendlog_messages
1118                    .iter()
1119                    .filter(|pkg| pkg.header.to == *member)
1120                    .filter(|pkg| {
1121                        if let AppendLog(args) = pkg.get_message() {
1122                            args.entries == expected_entries
1123                        } else {
1124                            false
1125                        }
1126                    })
1127                    .count();
1128                if broadcasted != 0 {
1129                    count += 1
1130                }
1131            }
1132            count == self.cluster.len()
1133        }
1134
1135        pub fn get_read_response(&mut self) -> Option<ReadRequestReplyArgs<TestState>> {
1136            self.messages()
1137                .into_iter()
1138                .map(|pkg| pkg.body)
1139                .find_map(|msg| {
1140                    if let ReadRequestReply(args) = msg {
1141                        Some(args)
1142                    } else {
1143                        None
1144                    }
1145                })
1146        }
1147
1148        pub fn send(&mut self, pkg: RaftPackage<TestState>) {
1149            self.out_queue_test.push(pkg);
1150        }
1151
1152        pub fn send_and_tick(&mut self, pkg: RaftPackage<TestState>) {
1153            self.out_queue_test.push(pkg);
1154            self.tick();
1155        }
1156
1157        pub fn package(&self, msg: RaftMessage<TestState>) -> RaftPackage<TestState> {
1158            package(msg)
1159                .from(SystemNodeId::default())
1160                .to(self.server.id.into())
1161                .build()
1162                .unwrap()
1163        }
1164
1165        pub fn recv(&mut self) -> Option<RaftPackage<TestState>> {
1166            self.in_queue_test.pop()
1167        }
1168
1169        pub fn recv_message(&mut self) -> Option<RaftMessage<TestState>> {
1170            self.recv().map(|p| p.body)
1171        }
1172
1173        pub fn reject_append_log(&mut self, id: SystemNodeId) {
1174            let append_log_rejection =
1175                RaftMessage::AppendLogResponse::<TestState>(AppendLogResponseResult {
1176                    term: self.server.term,
1177                    success: false,
1178                });
1179            let pkg = RaftPackageBuilder::default()
1180                .from(id)
1181                .to(self.server.id.into())
1182                .with_message(append_log_rejection)
1183                .build()
1184                .unwrap();
1185
1186            self.send(pkg);
1187        }
1188
1189        pub fn accept_append_log(&mut self, id: SystemNodeId) {
1190            self.answer_append_log(id, true)
1191        }
1192
1193        pub fn answer_append_log(&mut self, id: SystemNodeId, success: bool) {
1194            let append_log_rejection =
1195                RaftMessage::AppendLogResponse::<TestState>(AppendLogResponseResult {
1196                    term: self.server.term,
1197                    success,
1198                });
1199            let pkg = RaftPackageBuilder::default()
1200                .from(id)
1201                .to(self.server.id.into())
1202                .with_message(append_log_rejection)
1203                .build()
1204                .unwrap();
1205
1206            self.send(pkg);
1207        }
1208        pub fn followers_answers_append_log_request(&mut self, success: bool) {
1209            self.empty_queue();
1210            self.followers().into_iter().for_each(|follower| {
1211                self.answer_append_log(follower, success);
1212                self.tick();
1213            });
1214        }
1215
1216        pub fn followers_accept_append_log_request(&mut self) {
1217            self.followers_answers_append_log_request(true)
1218        }
1219
1220        pub fn followers(&mut self) -> Vec<SystemNodeId> {
1221            self.server.cluster.keys().cloned().collect()
1222        }
1223
1224        pub fn current_config(&self) -> UpdateClusterVec {
1225            let mut config: UpdateClusterVec =
1226                self.server.cluster.values().map(|node| node.id.into()).collect();
1227            config.push(self.server.id.into());
1228            config.sort();
1229            config
1230        }
1231
1232        pub fn send_append_log_and_tick(
1233            &mut self,
1234            prev_log_index: LogIndex,
1235            prev_log_term: Term,
1236            data: Option<TestState>,
1237        ) {
1238            let mut entries: Log<TestState> = Default::default();
1239            let _ = entries.push(LogEntry::with_data(prev_log_term, data));
1240
1241            let append_log = AppendLog(AppendLogArgs {
1242                term: 1,
1243                prev_log_index,
1244                prev_log_term,
1245                entries,
1246                leader_commit: 0,
1247            });
1248
1249            self.send_and_tick(self.package(append_log))
1250        }
1251    }
1252
1253    pub struct ContextBuilder<'a> {
1254        server_builder: ServerBuilder<'a>,
1255    }
1256
1257    impl<'a> ContextBuilder<'a> {
1258        pub fn new() -> Self {
1259            Self {
1260                server_builder: ServerBuilder::new(),
1261            }
1262        }
1263
1264        pub fn follower<'b: 'a>(&'b mut self) -> TestContext {
1265            let (mut in_queue_test, mut out_queue_test, mut server, cluster, comm_service) =
1266                self.server_builder.follower();
1267            TestContext {
1268                in_queue_test,
1269                out_queue_test,
1270                server,
1271                cluster,
1272                comm_service,
1273            }
1274        }
1275        pub fn leader<'b: 'a>(&'b mut self) -> TestContext {
1276            let (mut in_queue_test, mut out_queue_test, mut server, cluster, comm_service) =
1277                self.server_builder.leader();
1278            TestContext {
1279                in_queue_test,
1280                out_queue_test,
1281                server,
1282                cluster,
1283                comm_service,
1284            }
1285        }
1286    }
1287
1288    pub type BuilderOutput<'b> = (
1289        FakeDcsInMsgQueue<RaftPackage<TestState>>,
1290        FakeDcsOutMsgQueue<RaftPackage<TestState>>,
1291        RaftService<FakeTimer, TestState>,
1292        LinearMap<SystemNodeId, ClusterMember, CLUSTER_NODE_COUNT>,
1293        &'b mut FakeCommunicationService<'b>,
1294    );
1295
1296    pub struct ServerBuilder<'a> {
1297        test_rx: Option<FakeDcsInMsgQueue<RaftPackage<TestState>>>,
1298        comm_service_tx: FakeDcsOutMsgQueue<RaftPackage<TestState>>,
1299        test_tx: Option<FakeDcsOutMsgQueue<RaftPackage<TestState>>>,
1300        comm_service_rx: FakeDcsInMsgQueue<RaftPackage<TestState>>,
1301        communication_service: Option<FakeCommunicationService<'a>>,
1302    }
1303
1304    impl<'a> ServerBuilder<'a> {
1305        #[allow(clippy::new_without_default)]
1306        pub fn new() -> Self {
1307            let (out_rx, out_tx) = FakeDcsMsgQueue::build_pair();
1308            let (in_rx, in_tx) = FakeDcsMsgQueue::build_pair();
1309
1310            Self {
1311                test_rx: Some(out_rx),
1312                comm_service_tx: out_tx,
1313                test_tx: Some(in_tx),
1314                comm_service_rx: in_rx,
1315                communication_service: None,
1316            }
1317        }
1318
1319        pub fn follower<'b: 'a>(&'b mut self) -> BuilderOutput {
1320            let mut timer: FakeTimer = FakeTimer::new();
1321
1322            self.communication_service = Some(FakeCommunicationService::new(
1323                &mut self.comm_service_tx,
1324                &mut self.comm_service_rx,
1325            ));
1326            timer = Stopwatch::from_millis(ELECTION_TIMEOUT);
1327            let mut cluster: LinearMap<SystemNodeId, ClusterMember, CLUSTER_NODE_COUNT> =
1328                LinearMap::new();
1329            let _ = cluster.insert(
1330                0.into(),
1331                ClusterMember {
1332                    id: SystemNodeId::from(0),
1333                    vote_granted: Abstained,
1334                    match_idx: 0,
1335                    next_idx: 1,
1336                    last_successful_heartbeat: 0,
1337                },
1338            );
1339            let _ = cluster.insert(
1340                2.into(),
1341                ClusterMember {
1342                    id: SystemNodeId::from(2),
1343                    vote_granted: Abstained,
1344                    match_idx: 0,
1345                    next_idx: 1,
1346                    last_successful_heartbeat: 0,
1347                },
1348            );
1349            let mut server = RaftService::new(
1350                timer, Some(cluster.clone()), SystemNodeId::from(1), ELECTION_TIMEOUT
1351            );
1352
1353            server.leader_id = Some(SystemNodeId::from(0));
1354            (
1355                self.test_rx.take().unwrap(),
1356                self.test_tx.take().unwrap(),
1357                server,
1358                cluster,
1359                self.communication_service.as_mut().unwrap(),
1360            )
1361        }
1362
1363        pub fn server_in_cluster<'b: 'a>(&'b mut self) -> BuilderOutput {
1364            self.communication_service = Some(FakeCommunicationService::new(
1365                &mut self.comm_service_tx,
1366                &mut self.comm_service_rx,
1367            ));
1368            let mut timer: FakeTimer = FakeTimer::new();
1369
1370            let cluster = <ServerBuilder<'a>>::initialize_cluster_with_members();
1371            timer = Stopwatch::from_millis(ELECTION_TIMEOUT);
1372            let server = RaftService::new(
1373                timer, Some(cluster.clone()), SystemNodeId::from(0), ELECTION_TIMEOUT
1374            );
1375            (
1376                self.test_rx.take().unwrap(),
1377                self.test_tx.take().unwrap(),
1378                server,
1379                cluster,
1380                self.communication_service.as_mut().unwrap(),
1381            )
1382        }
1383
1384        pub fn candidate<'b: 'a>(&'b mut self) -> BuilderOutput {
1385            self.communication_service = Some(FakeCommunicationService::new(
1386                &mut self.comm_service_tx,
1387                &mut self.comm_service_rx,
1388            ));
1389            let timer: FakeTimer = FakeTimer::new();
1390
1391            let cluster = <ServerBuilder<'a>>::initialize_cluster_with_members();
1392
1393            let mut server = RaftService::new(
1394                timer, Some(cluster.clone()), SystemNodeId::from(0), ELECTION_TIMEOUT
1395            );
1396            let communication_service = self.communication_service.as_mut().unwrap();
1397            server.tick(communication_service); // timeouts, set term = 1, starts election
1398            server.io.timer = Stopwatch::from_millis(CANDIATE_ELECTION_TIMEOUT);
1399            self.test_rx.as_mut().unwrap().pop().unwrap(); // discard request vote
1400            self.test_rx.as_mut().unwrap().pop().unwrap(); // discard request vote
1401
1402            (
1403                self.test_rx.take().unwrap(),
1404                self.test_tx.take().unwrap(),
1405                server,
1406                cluster,
1407                communication_service,
1408            )
1409        }
1410
1411        pub fn leader<'b: 'a>(&'b mut self) -> BuilderOutput {
1412            self.communication_service = Some(FakeCommunicationService::new(
1413                &mut self.comm_service_tx,
1414                &mut self.comm_service_rx,
1415            ));
1416
1417            let mut cluster = <ServerBuilder<'a>>::initialize_cluster_with_members();
1418            let timer: FakeTimer = FakeTimer::new();
1419
1420            let mut server = RaftService::new(
1421                timer, Some(cluster.clone()), SystemNodeId::from(0), ELECTION_TIMEOUT
1422            );
1423            let communication_service = self.communication_service.as_mut().unwrap();
1424            server.tick(communication_service); // timeouts, set term = 1, starts election
1425            self.test_rx.as_mut().unwrap().pop().unwrap(); // discard request vote
1426            self.test_rx.as_mut().unwrap().pop().unwrap(); // discard request vote
1427
1428            let server_id = server.id.clone();
1429            for member in cluster.values().filter(|member| member.id != server_id) {
1430                let grant_vote = RaftMessage::RequestVoteResponse(RequestVoteResponseResult {
1431                    term: 1,
1432                    granted: true,
1433                });
1434                let msg = package(grant_vote)
1435                    .from(member.id)
1436                    .to(server.id)
1437                    .build()
1438                    .unwrap();
1439                self.test_tx.as_mut().unwrap().push(msg);
1440                server.tick(communication_service);
1441            }
1442
1443            for member in cluster.values().filter(|member| member.id != server_id) {
1444                let accept_appendlog = RaftMessage::AppendLogResponse(AppendLogResponseResult {
1445                    term: 1,
1446                    success: true,
1447                });
1448                let msg = package(accept_appendlog)
1449                    .from(member.id)
1450                    .to(server.id)
1451                    .build()
1452                    .unwrap();
1453                self.test_tx.as_mut().unwrap().push(msg);
1454                server.tick(communication_service);
1455            }
1456
1457            for _ in cluster.values().filter(|member| member.id != server.id) {
1458                self.test_rx.as_mut().unwrap().pop(); //removes leader heartbeat
1459            }
1460            (
1461                self.test_rx.take().unwrap(),
1462                self.test_tx.take().unwrap(),
1463                server,
1464                cluster,
1465                communication_service,
1466            )
1467        }
1468
1469        fn initialize_cluster_with_members() -> LinearMap<SystemNodeId, ClusterMember, CLUSTER_NODE_COUNT> {
1470            let mut cluster: LinearMap<SystemNodeId, ClusterMember, CLUSTER_NODE_COUNT> = LinearMap::new();
1471            let _ = cluster.insert(
1472                SystemNodeId::from(1),
1473                ClusterMember {
1474                    id: SystemNodeId::from(1),
1475                    vote_granted: Abstained,
1476                    match_idx: 0,
1477                    next_idx: 1,
1478                    last_successful_heartbeat: 0,
1479                },
1480            );
1481            let _ = cluster.insert(
1482                SystemNodeId::from(2),
1483                ClusterMember {
1484                    id: SystemNodeId::from(2),
1485                    vote_granted: Abstained,
1486                    match_idx: 0,
1487                    next_idx: 1,
1488                    last_successful_heartbeat: 0,
1489                },
1490            );
1491            cluster
1492        }
1493    }
1494}
1495#[macro_export]
1496macro_rules! assert_message {
1497    ($field:ident in $message:ident == $value:expr) => {{
1498        match $message {
1499            RequestVote(RequestVoteArgs { $field, .. }) => assert_eq!($value, $field),
1500            _ => fail(),
1501        }
1502    }};
1503}