1use std::fmt::Debug;
2
3use log::{debug, error, info, trace};
4use rand::Rng;
5use rand_chacha::rand_core::SeedableRng;
6use serde::{Deserialize, Serialize};
7
8use dcs::communication::messages::{UpdateClusterVec, Header, Package, PackageBuilder};
9use dcs::communication::service::CommunicationService;
10use dcs::coordination::{CoordinationPackage, CoordinationService, Stopwatch};
11use dcs::heapless;
12use dcs::heapless::LinearMap;
13use dcs::nodes::SystemNodeId;
14use dcs::properties::CLUSTER_NODE_COUNT;
15use dcs::rules::measurements::{Measurement, SystemState};
16use dcs::rules::strategy::Rule;
17
18use crate::messages::RaftMessage::{AppendLog, ReadRequestReply, WriteRequestReply};
19use crate::messages::*;
20use crate::metadata::RaftMetadata;
21use crate::server::ElectionVote::Abstained;
22
23
24mod candidate;
25mod follower;
26mod leader;
27
28pub trait NoOp {
29 fn noop() -> Self;
30}
31
32pub trait Merge {
33 fn merge(self, rhs: Self) -> Self;
34}
35
36pub trait LogData:
37 Clone + Debug + NoOp + Merge + Serialize + From<(SystemNodeId, Measurement)> + Into<SystemState>
38{
39}
40
41impl<T> LogData for T where
42 T: Clone
43 + Debug
44 + NoOp
45 + Merge
46 + Serialize
47 + From<(SystemNodeId, Measurement)>
48 + Into<SystemState>
49{
50}
51
52struct IO<T: Stopwatch> {
53 timer: T,
54 timeout_secs: u64,
55}
56
57impl<T: Stopwatch> IO<T> {
58 fn to_millis(secs: u64) -> u64 {
59 secs * 1000
60 }
61
62 pub fn set_heartbeat_timeout(&mut self) {
63 self.timer = Stopwatch::from_millis(Self::to_millis(self.timeout_secs) / 5);
64 }
65
66 pub fn set_follower_timeout(&mut self) {
67 self.timer = Stopwatch::from_millis(Self::to_millis(self.timeout_secs));
68 }
69
70 pub fn set_candidate_timeout(&mut self) {
71 self.timer = Stopwatch::from_millis((Self::to_millis(self.timeout_secs) * 3) / 4);
72 }
73
74 pub fn get_leasing_time_secs(&mut self) -> u64 {
75 self.timeout_secs
76 }
77}
78
79#[derive(Copy, Clone, Eq, PartialEq, Debug)]
80pub enum MemberState {
81 Leader,
82 Follower,
83 Candidate,
84}
85
86trait LeaderBehavior<T: Stopwatch, L: LogData> {
87 fn parse_message(
88 &mut self,
89 package: RaftPackage<L>,
90 communication_service: &mut dyn CommunicationService<RaftPackage<L>>,
91 ) -> MemberState;
92
93 fn after_tick(&mut self, communication_service: &mut dyn CommunicationService<RaftPackage<L>>);
94}
95
96trait CandidateBehavior<T: Stopwatch, L: LogData> {
97 fn parse_message(
98 &mut self,
99 package: RaftPackage<L>,
100 communication_service: &mut dyn CommunicationService<RaftPackage<L>>,
101 ) -> MemberState;
102
103 fn after_tick(&mut self, communication_service: &mut dyn CommunicationService<RaftPackage<L>>);
104}
105
106trait FollowerBehavior<T: Stopwatch, L: LogData> {
107 fn parse_message(
108 &mut self,
109 package: RaftPackage<L>,
110 communication_service: &mut dyn CommunicationService<RaftPackage<L>>,
111 ) -> MemberState;
112
113 fn after_tick(&mut self, communication_service: &mut dyn CommunicationService<RaftPackage<L>>);
114}
115
116pub type RaftPackage<T> = Package<SystemNodeId, RaftMessage<T>>;
117
118pub struct RaftService<T: Stopwatch, L: LogData> {
120 io: IO<T>,
121 term: Term,
122 id: SystemNodeId,
123 leader_id: Option<SystemNodeId>,
124 commit_index: LogIndex,
125 last_applied: LogIndex,
126 current_role: MemberState,
127 log: Log<L>,
128 cluster: LinearMap<SystemNodeId, ClusterMember, CLUSTER_NODE_COUNT>,
129 pub members_already_setup: bool,
130 next_config: Option<UpdateClusterVec>,
131}
132
133impl<T: Stopwatch, L: LogData>
134 CoordinationService<
135 T,
136 RaftMessage<L>,
137 LinearMap<SystemNodeId, RaftMetadata, CLUSTER_NODE_COUNT>,
138 RaftMetadata,
139 > for RaftService<T, L>
140{
141 fn new(id: SystemNodeId, metadata: RaftMetadata) -> Self {
151 debug!(
152 "Intializaing Raft Coordination Service. Node timeout: {:?}s",
153 metadata.timeout
154 );
155 let timer = Stopwatch::from_millis(metadata.timeout * 1000);
156 Self::new(timer, None, id.into(), metadata.timeout)
157 }
158
159 fn leader(&self) -> Option<SystemNodeId> {
160 self.leader_id
161 }
162
163 fn get_state(&self) -> SystemState {
164 debug!("Raft state: {:?}", self.log);
165 let mut state = SystemState::default();
166 for entry in self.log.iter().filter_map(|entry| entry.data.clone()) {
167 let state_entry : SystemState = entry.into();
168 state.extend(state_entry)
169 }
170 return state
171 }
172
173 fn get_current_rule(&self) -> Option<Rule> {
174 self.log.iter().filter_map(|entry| entry.rule).last()
175 }
176
177 fn update_rule(
178 &mut self,
179 communication_service: &mut dyn CommunicationService<Package<SystemNodeId, RaftMessage<L>>>,
180 new_rule: Rule
181 ) {
182 let message = RaftMessage::WriteRequest(WriteRequestArgs::with_rule(self.id, new_rule));
183 let package = package(message).from(self.id).to(self.id).build().unwrap();
184 self.process(communication_service, Some(package), None)
185 }
186
187 fn update_members(
188 &mut self,
189 communication_service: &mut dyn CommunicationService<RaftPackage<L>>,
190 new_config: UpdateClusterVec,
191 ) {
192 let message = RaftMessage::ConfigChange(new_config);
193 let package = package(message).from(self.id).to(self.id).build().unwrap();
194 self.process(communication_service, Some(package), None)
195 }
196
197 fn update_state(
198 &mut self,
199 communication_service: &mut dyn CommunicationService<RaftPackage<L>>,
200 measurement: Measurement,
201 ) {
202 if !self.is_leader() && self.leader_id.is_some() {
203 let measurement = RaftMessage::WriteRequest(WriteRequestArgs::with_measurement(self.id.into(), measurement));
204 let package = self
205 .build_message(measurement, self.leader_id.unwrap().into())
206 .unwrap();
207 communication_service.push(package);
208 } else {
209 let message = RaftMessage::WriteRequest(WriteRequestArgs::with_measurement(self.id, measurement));
210 let package = self.build_message(message, self.id).unwrap();
211 self.process(communication_service, Some(package), None)
212 }
213 }
214
215 fn process(
216 &mut self,
217 communication_service: &mut dyn CommunicationService<RaftPackage<L>>,
218 package: Option<CoordinationPackage<RaftMessage<L>>>,
219 members: LinearMap<SystemNodeId, RaftMetadata, CLUSTER_NODE_COUNT>,
220 ) {
221 self.process(communication_service, package, Some(members))
222 }
223}
224
225
226impl<T: Stopwatch, L: LogData> RaftService<T, L> {
227 pub(crate) fn new(
228 timer: T,
229 cluster: Option<LinearMap<SystemNodeId, ClusterMember, CLUSTER_NODE_COUNT>>,
230 id: SystemNodeId,
231 timeout_secs: u64,
232 ) -> Self {
233 RaftService {
234 id,
235 term: 0,
236 leader_id: None,
237 commit_index: 0,
238 last_applied: 0,
239 io: IO { timer, timeout_secs, },
240 log: Default::default(),
241 members_already_setup: cluster.is_some(),
242 current_role: MemberState::Follower,
243 cluster: cluster.unwrap_or_default(),
244 next_config: None,
245 }
246 }
247
248 pub fn is_leader(&self) -> bool {
249 self.current_role == MemberState::Leader
250 }
251
252 pub fn process(
253 &mut self,
254 communication_service: &mut dyn CommunicationService<RaftPackage<L>>,
255 message: Option<Package<SystemNodeId, RaftMessage<L>>>,
256 members: Option<LinearMap<SystemNodeId, RaftMetadata, CLUSTER_NODE_COUNT>>,
257 ) {
258 if !self.members_already_setup {
259 self.set_up_members_for_the_first_time(members);
260 }
261 if let Some(msg) = message.clone() {
262 if msg.header.from != msg.header.to {
263 debug!("[RAFT] ({}) ==|{}|==> ({})", msg.header.from, msg.body, msg.header.to);
264 }
265 }
266 if self.members_already_setup {
267 trace!("Current log status: {:?}", self.log);
268 self.io.timer.update();
269 if let Some(package) = message {
270 self.parse_message(package, communication_service);
271 } else {
272 trace!("No message received, ticking!");
273 }
274
275 self.after_tick(communication_service);
276 }
277 }
278
279 fn set_up_members_for_the_first_time(
280 &mut self,
281 members: Option<LinearMap<SystemNodeId, RaftMetadata, CLUSTER_NODE_COUNT>>,
282 ) {
283 if let Some(members) = members {
284 debug!("Setting up members for the first time, cluster: {:?}", members);
285 self.cluster = members
286 .iter()
287 .filter(|(&id, _)| id != self.id)
288 .map(|(&id, _metadata)| {
289 (
290 id,
291 ClusterMember {
292 id: id.into(),
293 vote_granted: ElectionVote::Abstained,
294 next_idx: 1,
295 match_idx: 0,
296 last_successful_heartbeat: 0,
297 },
298 )
299 })
300 .collect();
301 self.members_already_setup = true;
302 }
303 }
304
305 pub fn tick(&mut self, communication_service: &mut dyn CommunicationService<RaftPackage<L>>) {
306 let message = communication_service.pop();
307 self.process(communication_service, message, None)
308 }
309
310 fn parse_message(
311 &mut self,
312 package: RaftPackage<L>,
313 communication_service: &mut dyn CommunicationService<RaftPackage<L>>,
314 ) {
315 debug!("Processing package {:?} as {:?}.", package, self.current_role);
316
317 self.current_role = match self.current_role {
318 MemberState::Leader => (self as &mut dyn LeaderBehavior<T, L>)
319 .parse_message(package, communication_service),
320 MemberState::Follower => (self as &mut dyn FollowerBehavior<T, L>)
321 .parse_message(package, communication_service),
322 MemberState::Candidate => (self as &mut dyn CandidateBehavior<T, L>)
323 .parse_message(package, communication_service),
324 };
325 }
326
327 fn update_ttl(&mut self, header: &Header<SystemNodeId>) {
328 if let Some(node) = self.cluster.get_mut(&header.from) {
329 node.last_successful_heartbeat = self.io.timer.current_time_as_secs();
330 }
331 }
332
333 fn start_election(
334 &mut self,
335 communication_service: &mut dyn CommunicationService<RaftPackage<L>>,
336 ) {
337 self.term += 1;
338 self.current_role = MemberState::Candidate;
339 self.clean_state_from_previous_election();
340 self.send_election_messages(communication_service);
341 self.io.set_candidate_timeout();
342 }
343
344 fn clean_state_from_previous_election(&mut self) {
345 self.cluster
346 .values_mut()
347 .for_each(|member| member.vote_granted = Abstained);
348 }
349
350 fn send_election_messages(
351 &mut self,
352 communication_service: &mut dyn CommunicationService<RaftPackage<L>>,
353 ) {
354 info!("Node #{} starting leader election.", self.id);
355 debug!("Election started in cluster: {:?}.", self.cluster);
356 let cluster = &self.cluster;
357 let term = self.term;
358 let id = self.id;
359 cluster
360 .values()
361 .into_iter()
362 .filter(|member: &&ClusterMember| member.id != id)
363 .for_each(|member| {
364 let msg = RaftPackageBuilder::default()
365 .from(self.id)
366 .to(member.id)
367 .with_message(RaftMessage::RequestVote(RequestVoteArgs {
368 term,
369 prev_log_index: self.get_last_log_index(),
370 prev_log_term: self.get_last_log_term(),
371 }))
372 .build()
373 .ok();
374 if let Some(pkg) = msg {
375 communication_service.push(pkg)
376 }
377 });
378 }
379
380 pub(crate) fn build_message(
381 &self,
382 msg: RaftMessage<L>,
383 to: SystemNodeId,
384 ) -> Option<Package<SystemNodeId, RaftMessage<L>>> {
385 package(msg)
386 .from(self.id)
387 .to(to)
388 .build()
389 .ok()
390 }
391
392 pub(crate) fn broadcast_message(
393 &mut self,
394 message: RaftMessage<L>,
395 communication_service: &mut dyn CommunicationService<RaftPackage<L>>,
396 ) {
397 let my_id = self.id;
398 let member_ids = self
399 .cluster
400 .values()
401 .map(|m| m.id)
402 .filter(|member_id| member_id != &my_id);
403 for member in member_ids {
404 self.send_message_to(message.clone(), member, communication_service)
405 }
406 }
407
408 fn send_message_to(
409 &self,
410 message: RaftMessage<L>,
411 member: SystemNodeId,
412 communication_service: &mut dyn CommunicationService<RaftPackage<L>>,
413 ) {
414 if let Some(msg) = self.build_message(message, member.into()) {
415 communication_service.push(msg)
416 }
417 }
418
419 pub(crate) fn reset_next_index(&mut self) {
420 let next_log_idx = self.get_last_log_index() + 1;
421 self.cluster
422 .iter_mut()
423 .for_each(|(_id, member)| member.next_idx = next_log_idx)
424 }
425
426 pub(crate) fn reset_match_index(&mut self) {
427 self.cluster
428 .iter_mut()
429 .for_each(|(_id, member)| member.match_idx = 0)
430 }
431
432 pub(crate) fn commit_noop(
433 &mut self,
434 communication_service: &mut dyn CommunicationService<RaftPackage<L>>,
435 ) {
436 let prev_log_index = self.get_last_log_index();
437 let prev_log_term = self.get_last_log_term();
438 let noop: LogEntry<L> = LogEntry::with_data(self.term, Some(L::noop()));
439 let _ = self.log.push(noop.clone());
440 let mut entries = Log::new();
441 let _ = entries.push(noop);
442 self.broadcast_message(
443 AppendLog(AppendLogArgs {
444 term: self.term,
445 prev_log_index,
446 prev_log_term,
447 entries,
448 leader_commit: self.commit_index,
449 }),
450 communication_service,
451 );
452 }
453
454 pub fn reject_read_request(
455 &mut self,
456 header: Header<SystemNodeId>,
457 communication_service: &mut dyn CommunicationService<RaftPackage<L>>,
458 ) {
459 let reject_read_request = ReadRequestReply(ReadRequestReplyArgs::fail(self.leader_id));
460 communication_service.push(
461 self.build_message(reject_read_request, header.from)
462 .unwrap(),
463 )
464 }
465
466 pub fn reject_write_request(
467 &mut self,
468 header: Header<SystemNodeId>,
469 communication_service: &mut dyn CommunicationService<RaftPackage<L>>,
470 ) {
471 let msg = self
472 .build_message(WriteRequestReply(false, self.leader_id), header.from)
473 .unwrap();
474
475 communication_service.push(msg)
476 }
477
478 pub(crate) fn get_last_log_term(&self) -> Term {
479 if let Some(entry) = self.log.last() {
480 entry.term
481 } else {
482 0
483 }
484 }
485
486 pub(crate) fn get_last_log_index(&self) -> LogIndex {
487 self.log.last_index() as LogIndex
488 }
489
490 fn after_tick(&mut self, communication_service: &mut dyn CommunicationService<RaftPackage<L>>) {
491 if self.log.capacity() < 0.25 {
492 info!("Creating snapshot with commit-idx {}.", self.commit_index);
493 debug!("Log to snapshot: {:?}", self.log);
494 self.log.snapshot(self.commit_index);
495 debug!("Snapshot result: {:?}", self.log);
496 }
497 match self.current_role {
498 MemberState::Leader => (self as &mut dyn LeaderBehavior<T, L>).after_tick(communication_service),
499 MemberState::Follower => (self as &mut dyn FollowerBehavior<T, L>).after_tick(communication_service),
500 MemberState::Candidate => (self as &mut dyn CandidateBehavior<T, L>).after_tick(communication_service),
501 }
502 }
503
504 fn send_heartbeat_to_followers(
505 &mut self,
506 communication_service: &mut dyn CommunicationService<RaftPackage<L>>,
507 ) {
508 let message = self.make_heartbeat();
509 let my_id = self.id;
510 let heartbeat_ttl = self.io.get_leasing_time_secs() / 2;
511 let now = self.io.timer.current_time_as_secs();
512 let member_ids = self
513 .cluster
514 .values()
515 .filter(|m| now - m.last_successful_heartbeat > heartbeat_ttl)
516 .map(|m| m.id)
517 .filter(|member_id| member_id != &my_id);
518 for member in member_ids {
519 self.send_message_to(message.clone(), member, communication_service)
520 }
521 }
522
523 fn make_heartbeat(&mut self) -> RaftMessage<L> {
524 let prev_log_index = self.get_last_log_index();
525 let prev_log_term = self.get_last_log_term();
526 RaftMessage::AppendLog(AppendLogArgs {
527 term: self.term,
528 prev_log_index,
529 prev_log_term,
530 entries: Log::new(),
531 leader_commit: self.commit_index,
532 })
533 }
534
535 fn send_install_snapshot_to(
536 &self,
537 member: &ClusterMember,
538 communication_service: &mut dyn CommunicationService<RaftPackage<L>>,
539 ) {
540 let snapshot = self.log.get(1).unwrap();
541 let args = InstallSnapshotArgs {
542 term: self.term,
543 leader_id: self.id,
544 last_included_index: self.log.last_included_index(),
545 last_included_term: snapshot.term,
546 data: snapshot.clone(),
547 };
548 self.send_message_to(
549 RaftMessage::InstallSnapshot(args),
550 member.id,
551 communication_service,
552 );
553 }
554
555 fn send_next_entry_to(
556 &self,
557 member: &ClusterMember,
558 communication_service: &mut dyn CommunicationService<RaftPackage<L>>,
559 ) {
560 let (prev_log_index, prev_log_term) = if member.next_idx > 1 {
561 let idx = member.next_idx - 1;
562 let term = self
563 .log
564 .get(idx as LogIndex)
565 .map(|entry| entry.term)
566 .unwrap_or_default();
567 (idx as LogIndex, term)
568 } else {
569 (0, 0)
570 };
571 trace!("Send next entry to {member:?} with previous-log-idx {prev_log_index} and term {prev_log_term}");
572 if let Some(current_entry) = self.log.get(member.next_idx) {
573 let mut entries = Log::new();
574 let _ = entries.push(current_entry.clone());
575 let append_log_msg = AppendLog(AppendLogArgs {
576 term: self.term,
577 prev_log_index,
578 prev_log_term,
579 entries,
580 leader_commit: self.commit_index,
581 });
582 trace!("Sending next entry to {member:?} entry: {append_log_msg:?}");
583 self.send_message_to(append_log_msg, member.id, communication_service);
584 }
585 }
586
587 pub fn current_config(&self) -> UpdateClusterVec {
588 let mut current_config = UpdateClusterVec::from_iter(self.cluster.values().map(|v| v.id.into()));
589 current_config.push(self.id.into());
590 current_config.sort();
591 current_config
592 }
593
594 pub fn update_config(&mut self, new_cluster: &UpdateClusterVec) {
595 for member_id in new_cluster {
596 if !self.cluster.contains_key(&member_id) && *member_id != self.id {
597 let new_member = ClusterMember {
598 id: *member_id,
599 vote_granted: ElectionVote::Abstained,
600 next_idx: 1,
601 match_idx: 0,
602 last_successful_heartbeat: 0,
603 };
604 debug!("Adding new member to cluster {new_member:?}");
605 if self.cluster.insert(*member_id, new_member).is_err() {
606 error!("Couldn't insert node #{} into cluster data.", member_id)
607 }
608 }
609 }
610
611 let mut members_to_delete = heapless::Vec::<SystemNodeId, CLUSTER_NODE_COUNT>::new();
612 for system_id in self.cluster.keys() {
613 if !new_cluster.contains(&system_id) {
614 members_to_delete.push(*system_id);
615 }
616 }
617 members_to_delete.into_iter().for_each(|member| {
618 self.cluster.remove(&member);
619 });
620 }
621
622 pub fn update_commit_index(&mut self) {
623 let mut commit_indexes: Vec<LogIndex> = self
624 .cluster
625 .values()
626 .map(|member| member.next_idx.checked_sub(1).unwrap_or_default())
627 .collect();
628 commit_indexes.sort_by(|x, y| x.cmp(y).reverse());
629 let mut latest_entry_shared = commit_indexes.first().cloned();
630 for index in commit_indexes.iter() {
631 let mut count = commit_indexes
632 .iter()
633 .filter(|other_index| other_index <= &index)
634 .count();
635 if self.commit_index <= *index {
636 count += 1
637 }
638 if count >= (self.cluster.len() + 1) / 2 {
639 latest_entry_shared = Some(*index);
640 break;
641 }
642 }
643 if let Some(index) = latest_entry_shared {
644 if index >= self.commit_index {
645 self.commit_index = index;
646 }
647 }
648 }
649
650 pub fn replicate_entry(
651 &mut self,
652 entry: LogEntry<L>,
653 communication_service: &mut dyn CommunicationService<RaftPackage<L>>,
654 ) {
655 let mut entries = Log::new();
656 let _ = entries.push(entry);
657 let append_log_msg = RaftMessage::AppendLog(AppendLogArgs {
658 term: self.term,
659 prev_log_index: self.get_last_log_index(),
660 prev_log_term: self.get_last_log_term(),
661 entries,
662 leader_commit: self.commit_index,
663 });
664 self.broadcast_message(append_log_msg, communication_service);
665 }
666
667 fn become_follower_of(&mut self, leader_id: SystemNodeId, new_term: Term) {
668 self.io.set_follower_timeout();
669 self.term = new_term;
670 debug!("Updating leader_id to: {}", leader_id);
671 self.leader_id = Some(leader_id);
672 }
673}
674
675fn package<T: LogData>(msg: RaftMessage<T>) -> RaftPackageBuilder<T> {
676 RaftPackageBuilder::default().with_message(msg)
677}
678
679#[derive(Serialize, Deserialize, Copy, Clone, Eq, PartialEq, Debug)]
680pub enum ElectionVote {
681 Granted,
682 Against,
683 Abstained,
684}
685
686#[derive(Serialize, Deserialize, Copy, Clone, Eq, PartialEq, Debug)]
687pub struct ClusterMember {
688 pub id: SystemNodeId,
689 pub vote_granted: ElectionVote,
690 pub next_idx: LogIndex,
691 pub match_idx: LogIndex,
692 pub last_successful_heartbeat: u64,
693}
694
695#[cfg(test)]
696#[allow(non_snake_case)]
697mod test_utils {
698 extern crate log;
699 extern crate std;
700
701 use core::fmt::Debug;
702 use std::println;
703 use std::sync::atomic::{AtomicUsize, Ordering};
704 use std::sync::mpsc;
705 use std::sync::mpsc::{Receiver, Sender};
706
707 use serde::{Deserialize, Serialize};
708
709 use dcs::communication::connection::{InMsgQueue, OutMsgQueue, Readable, Writable};
710 use dcs::communication::messages::{PackageBuilder};
711 use dcs::communication::service::CommunicationService;
712 use dcs::coordination::Stopwatch;
713 use dcs::nodes::SystemNodeId;
714 use dcs::rules::measurements::{Measurement, SystemState};
715
716 use crate::messages::RaftMessage::RequestVote;
717 use crate::messages::{RaftMessage, WriteRequestArgs};
718 use crate::server::{package, LogData, Merge, NoOp, RaftPackage, RaftService};
719 use crate::{CANDIATE_ELECTION_TIMEOUT, ELECTION_TIMEOUT};
720
721 static CLOCK: AtomicUsize = AtomicUsize::new(0);
722
723 pub struct FakeTimer {
724 clock: u32,
725 timeout: u32,
726 }
727
728 impl FakeTimer {
729 fn update_timer(&mut self) {
730 CLOCK.fetch_add(1, Ordering::Relaxed);
731 self.clock += 1;
732 if self.timeout > 0 {
733 self.timeout -= 1;
734 }
735 }
736 pub fn new() -> Self
737 where
738 Self: Sized,
739 {
740 FakeTimer {
741 clock: 0,
742 timeout: 0,
743 }
744 }
745 }
746
747 impl Stopwatch for FakeTimer {
748 fn from_millis(secs: u64) -> Self {
749 Self {
750 clock: 0 as u32,
751 timeout: secs as u32,
752 }
753 }
754
755 fn restart(&mut self) {}
756
757 fn is_timeout(&self) -> bool {
758 self.timeout == 0
759 }
760
761 fn current_time_as_secs(&mut self) -> u64 {
762 CLOCK.load(Ordering::Relaxed) as u64
763 }
764
765 fn as_secs(&self) -> u64 { unreachable!() }
766
767 fn update(&mut self) {
768 self.update_timer()
769 }
770 }
771
772 #[derive(Copy, Clone, Eq, PartialEq, Debug, Default, Serialize, Deserialize)]
773 pub struct TestState(pub u32, pub u32);
774
775 impl From<(SystemNodeId, Measurement)> for TestState {
776 fn from((_, m): (SystemNodeId, Measurement)) -> Self {
777 TestState(m.value as u32, 0)
778 }
779 }
780
781 impl From<TestState> for SystemState {
782 fn from(_: TestState) -> Self {
783 unreachable!()
784 }
785 }
786
787 impl NoOp for TestState {
788 fn noop() -> Self {
789 Self(0, 0)
790 }
791 }
792
793 impl Merge for TestState {
794 fn merge(self, rhs: Self) -> Self {
795 Self(self.0 + rhs.0, self.1 + rhs.1)
796 }
797 }
798
799 pub fn assert_message_is_vote_request<T: LogData + Debug>(actual_msg: &RaftMessage<T>) {
800 match actual_msg {
801 RequestVote { .. } => success(),
802 _ => fail(),
803 }
804 }
805
806 pub fn fail() {
807 unreachable!()
808 }
809
810 pub fn success() {
811 assert!(true)
812 }
813
814 pub fn timeout_candidate<T: LogData + Debug>(
815 candidate: &mut RaftService<FakeTimer, T>,
816 communication_service: &mut dyn CommunicationService<RaftPackage<T>>,
817 ) {
818 tick_times(CANDIATE_ELECTION_TIMEOUT, candidate, communication_service);
819 }
820
821 pub fn almost_timeout_follower<T: LogData + Debug>(
822 candidate: &mut RaftService<FakeTimer, T>,
823 communication_service: &mut dyn CommunicationService<RaftPackage<T>>,
824 ) {
825 tick_times(ELECTION_TIMEOUT - 1, candidate, communication_service);
826 }
827
828 pub fn timeout_follower<T: LogData + Debug>(
829 candidate: &mut RaftService<FakeTimer, T>,
830 communication_service: &mut dyn CommunicationService<RaftPackage<T>>,
831 ) {
832 tick_times(ELECTION_TIMEOUT, candidate, communication_service);
833 }
834
835 fn tick_times<T: LogData + Debug>(
836 times: u64,
837 candidate: &mut RaftService<FakeTimer, T>,
838 communication_service: &mut dyn CommunicationService<RaftPackage<T>>,
839 ) {
840 for _i in 0..times {
841 candidate.tick(communication_service);
842 }
843 }
844
845 pub fn read_msg_sent_by_server<T: LogData + Debug>(
846 in_queue_test: &mut dyn InMsgQueue<RaftPackage<T>>,
847 ) -> RaftMessage<T> {
848 let package = in_queue_test
849 .pop()
850 .expect("Message wasn't received by server");
851 package.body
852 }
853
854 pub fn get_destination_of_msg<T: LogData + Debug>(
855 in_queue_test: &mut dyn InMsgQueue<RaftPackage<T>>,
856 ) -> SystemNodeId {
857 in_queue_test.pop().unwrap().header.to
858 }
859
860 pub struct FakeDcsMsgQueue;
861
862 impl FakeDcsMsgQueue {
863 pub fn build_pair() -> (
864 FakeDcsInMsgQueue<RaftPackage<TestState>>,
865 FakeDcsOutMsgQueue<RaftPackage<TestState>>,
866 ) {
867 let (tx, rx) = mpsc::channel::<RaftPackage<TestState>>();
868 (
869 FakeDcsInMsgQueue { in_channel: rx },
870 FakeDcsOutMsgQueue { out_channel: tx },
871 )
872 }
873 }
874
875 pub struct FakeDcsOutMsgQueue<T: Writable + Debug> {
876 out_channel: Sender<T>,
877 }
878
879 #[derive(Debug)]
880 pub struct FakeDcsInMsgQueue<T: Readable + Debug> {
881 in_channel: Receiver<T>,
882 }
883
884 impl<T: Writable + Send + Debug> OutMsgQueue<T> for FakeDcsOutMsgQueue<T> {
885 fn push(&mut self, msg: T) {
886 println!("PUSH {:?}", msg);
887 let _ = self.out_channel.send(msg);
888 }
889 }
890
891 impl<T: Readable + Debug> InMsgQueue<T> for FakeDcsInMsgQueue<T> {
892 fn pop(&mut self) -> Option<T> {
893 match self.in_channel.try_recv() {
894 Ok(msg) => {
895 println!("POP {:?}", msg);
896 Some(msg)
897 }
898 Err(_) => None,
899 }
900 }
901 }
902
903 pub struct FakeCommunicationService<'a> {
904 out_queue: &'a mut dyn OutMsgQueue<RaftPackage<TestState>>,
905 in_queue: &'a mut dyn InMsgQueue<RaftPackage<TestState>>,
906 }
907
908 impl<'a> FakeCommunicationService<'a> {
909 pub fn new(
910 out_queue: &'a mut dyn OutMsgQueue<RaftPackage<TestState>>,
911 in_queue: &'a mut dyn InMsgQueue<RaftPackage<TestState>>,
912 ) -> Self {
913 Self {
914 out_queue,
915 in_queue,
916 }
917 }
918 }
919
920 impl<'a> OutMsgQueue<RaftPackage<TestState>> for FakeCommunicationService<'a> {
921 fn push(&mut self, msg: RaftPackage<TestState>) {
922 self.out_queue.push(msg);
923 }
924 }
925
926 impl<'a> InMsgQueue<RaftPackage<TestState>> for FakeCommunicationService<'a> {
927 fn pop(&mut self) -> Option<RaftPackage<TestState>> {
928 self.in_queue.pop()
929 }
930 }
931
932 impl<'a> CommunicationService<RaftPackage<TestState>> for FakeCommunicationService<'a> {}
933
934 pub fn configure_read_only_request<T: LogData + Debug>(id: SystemNodeId) -> RaftPackage<T> {
935 package(RaftMessage::ReadRequest)
936 .from(0.into())
937 .to(id)
938 .build()
939 .unwrap()
940 }
941
942 pub fn send_read_request(
943 server: &mut RaftService<FakeTimer, TestState>,
944 out_queue_test: &mut dyn OutMsgQueue<RaftPackage<TestState>>,
945 communication_service: &mut dyn CommunicationService<RaftPackage<TestState>>,
946 ) {
947 let client_request = configure_read_only_request(server.id);
948 out_queue_test.push(client_request);
949 server.tick(communication_service);
950 }
951
952 pub fn send_write_request(
953 server: &mut RaftService<FakeTimer, TestState>,
954 out_queue_test: &mut dyn OutMsgQueue<RaftPackage<TestState>>,
955 data: Measurement,
956 communication_service: &mut dyn CommunicationService<RaftPackage<TestState>>,
957 ) {
958 let server_id = server.id;
959 let client_request = configure_write_request(server_id, data);
960 out_queue_test.push(client_request);
961 server.tick(communication_service);
962 }
963
964 pub fn configure_write_request(
965 id: SystemNodeId,
966 data: Measurement,
967 ) -> RaftPackage<TestState> {
968 let current_log = RaftMessage::WriteRequest(WriteRequestArgs::with_measurement(id, data));
969 package(current_log)
970 .from(2.into())
971 .to(id)
972 .build()
973 .unwrap()
974 }
975}
976
977#[cfg(test)]
978pub mod test_server_builder {
979 extern crate log;
980 extern crate std;
981
982 use std::borrow::BorrowMut;
983 use std::collections::HashMap;
984 use std::iter::FromFn;
985 use std::ops::DerefMut;
986
987 use dcs::communication::connection::{InMsgQueue, OutMsgQueue};
988 use dcs::communication::messages::PackageBuilder;
989 use dcs::communication::service::CommunicationService;
990 use dcs::coordination::Stopwatch;
991 use dcs::heapless;
992 use dcs::heapless::LinearMap;
993 use dcs::nodes::SystemNodeId;
994 use dcs::rules::measurements::Measurement;
995 use dcs::rules::measurements::ClusterType::TEMPERATURE;
996
997 use crate::messages::*;
998 use crate::server::test_utils::*;
999 use crate::server::ElectionVote::Abstained;
1000 use crate::server::*;
1001 use crate::server::{package, CLUSTER_NODE_COUNT};
1002 use crate::state::RaftState;
1003 use crate::RaftMessage::*;
1004 use crate::{RaftPackageBuilder, CANDIATE_ELECTION_TIMEOUT, ELECTION_TIMEOUT};
1005
1006 pub struct TestContext<'a> {
1007 in_queue_test: FakeDcsInMsgQueue<RaftPackage<TestState>>,
1008 out_queue_test: FakeDcsOutMsgQueue<RaftPackage<TestState>>,
1009 server: RaftService<FakeTimer, TestState>,
1010 cluster: LinearMap<SystemNodeId, ClusterMember, CLUSTER_NODE_COUNT>,
1011 comm_service: &'a mut FakeCommunicationService<'a>,
1012 }
1013
1014 impl<'a> TestContext<'a> {
1015 pub fn get_node_mut(&mut self) -> &mut RaftService<FakeTimer, TestState> {
1016 &mut self.server
1017 }
1018
1019 pub fn get_node(&self) -> &RaftService<FakeTimer, TestState> {
1020 &self.server
1021 }
1022 pub fn term(&self) -> Term {
1023 self.server.term
1024 }
1025 pub fn tick(&mut self) {
1026 let server = &mut self.server;
1027 let mut comm_service = self.comm_service.deref_mut();
1028 server.tick(comm_service)
1029 }
1030
1031 pub fn cluster_size(&self) -> usize {
1032 self.server.cluster.len() + 1
1033 }
1034
1035 pub fn cluster(&self) -> LinearMap<SystemNodeId, ClusterMember, CLUSTER_NODE_COUNT> {
1036 self.server.cluster.clone()
1037 }
1038
1039 pub fn id(&self) -> SystemNodeId {
1040 self.server.id
1041 }
1042
1043 pub fn commit_idx(&self) -> LogIndex {
1044 self.server.commit_index
1045 }
1046
1047 pub fn send_read_request(&mut self) {
1048 let server_id = self.server.id;
1049 let client_request = configure_read_only_request(server_id);
1050 self.send(client_request);
1051 }
1052
1053 pub fn send_read_request_and_tick(&mut self) {
1054 self.send_read_request();
1055 self.tick();
1056 }
1057
1058 pub fn send_write_request(&mut self, data: Measurement) {
1059 let server_id = self.server.id;
1060 let client_request = configure_write_request(server_id, data);
1061 self.send(client_request);
1062 }
1063
1064 pub fn send_write_request_and_tick(&mut self, data: Measurement) {
1065 self.send_write_request(data);
1066 self.tick();
1067 }
1068
1069 pub fn empty_queue(&mut self) {
1070 let _ = self.messages();
1071 }
1072
1073 pub fn messages(&mut self) -> std::vec::Vec<RaftPackage<TestState>> {
1074 let mut messages = vec![];
1075 let mut pkg = self.recv();
1076 while pkg.is_some() {
1077 messages.push(pkg.unwrap());
1078 pkg = self.recv();
1079 }
1080 messages
1081 }
1082
1083 pub fn append_log_requests(&mut self) -> Vec<AppendLogArgs<TestState>> {
1084 let mut messages = self.messages();
1085 let messages: Vec<_> = messages
1086 .into_iter()
1087 .filter_map(|pkg| {
1088 if let RaftMessage::AppendLog(args) = pkg.body {
1089 Some(args)
1090 } else {
1091 None
1092 }
1093 })
1094 .collect();
1095 messages
1096 }
1097
1098 pub fn count_heartbeats(&mut self) -> usize {
1099 let queue = &mut self.in_queue_test;
1100 let iter = std::iter::from_fn(|| queue.pop());
1101 iter.map(|pkg| pkg.body)
1102 .filter(|msg| matches!(msg, RaftMessage::AppendLog(args)))
1103 .count()
1104 }
1105
1106 pub fn entry_was_broadcasted(&mut self, entry: LogEntry<TestState>) -> bool {
1107 let mut appendlog_messages: Vec<RaftPackage<TestState>> = self
1108 .messages()
1109 .into_iter()
1110 .filter(|pkg| matches!(pkg.get_message(), RaftMessage::AppendLog(args)))
1111 .collect();
1112 let mut expected_entries = Log::new();
1113 expected_entries.push(entry);
1114
1115 let mut count = 0;
1116 for member in self.cluster.keys() {
1117 let broadcasted = appendlog_messages
1118 .iter()
1119 .filter(|pkg| pkg.header.to == *member)
1120 .filter(|pkg| {
1121 if let AppendLog(args) = pkg.get_message() {
1122 args.entries == expected_entries
1123 } else {
1124 false
1125 }
1126 })
1127 .count();
1128 if broadcasted != 0 {
1129 count += 1
1130 }
1131 }
1132 count == self.cluster.len()
1133 }
1134
1135 pub fn get_read_response(&mut self) -> Option<ReadRequestReplyArgs<TestState>> {
1136 self.messages()
1137 .into_iter()
1138 .map(|pkg| pkg.body)
1139 .find_map(|msg| {
1140 if let ReadRequestReply(args) = msg {
1141 Some(args)
1142 } else {
1143 None
1144 }
1145 })
1146 }
1147
1148 pub fn send(&mut self, pkg: RaftPackage<TestState>) {
1149 self.out_queue_test.push(pkg);
1150 }
1151
1152 pub fn send_and_tick(&mut self, pkg: RaftPackage<TestState>) {
1153 self.out_queue_test.push(pkg);
1154 self.tick();
1155 }
1156
1157 pub fn package(&self, msg: RaftMessage<TestState>) -> RaftPackage<TestState> {
1158 package(msg)
1159 .from(SystemNodeId::default())
1160 .to(self.server.id.into())
1161 .build()
1162 .unwrap()
1163 }
1164
1165 pub fn recv(&mut self) -> Option<RaftPackage<TestState>> {
1166 self.in_queue_test.pop()
1167 }
1168
1169 pub fn recv_message(&mut self) -> Option<RaftMessage<TestState>> {
1170 self.recv().map(|p| p.body)
1171 }
1172
1173 pub fn reject_append_log(&mut self, id: SystemNodeId) {
1174 let append_log_rejection =
1175 RaftMessage::AppendLogResponse::<TestState>(AppendLogResponseResult {
1176 term: self.server.term,
1177 success: false,
1178 });
1179 let pkg = RaftPackageBuilder::default()
1180 .from(id)
1181 .to(self.server.id.into())
1182 .with_message(append_log_rejection)
1183 .build()
1184 .unwrap();
1185
1186 self.send(pkg);
1187 }
1188
1189 pub fn accept_append_log(&mut self, id: SystemNodeId) {
1190 self.answer_append_log(id, true)
1191 }
1192
1193 pub fn answer_append_log(&mut self, id: SystemNodeId, success: bool) {
1194 let append_log_rejection =
1195 RaftMessage::AppendLogResponse::<TestState>(AppendLogResponseResult {
1196 term: self.server.term,
1197 success,
1198 });
1199 let pkg = RaftPackageBuilder::default()
1200 .from(id)
1201 .to(self.server.id.into())
1202 .with_message(append_log_rejection)
1203 .build()
1204 .unwrap();
1205
1206 self.send(pkg);
1207 }
1208 pub fn followers_answers_append_log_request(&mut self, success: bool) {
1209 self.empty_queue();
1210 self.followers().into_iter().for_each(|follower| {
1211 self.answer_append_log(follower, success);
1212 self.tick();
1213 });
1214 }
1215
1216 pub fn followers_accept_append_log_request(&mut self) {
1217 self.followers_answers_append_log_request(true)
1218 }
1219
1220 pub fn followers(&mut self) -> Vec<SystemNodeId> {
1221 self.server.cluster.keys().cloned().collect()
1222 }
1223
1224 pub fn current_config(&self) -> UpdateClusterVec {
1225 let mut config: UpdateClusterVec =
1226 self.server.cluster.values().map(|node| node.id.into()).collect();
1227 config.push(self.server.id.into());
1228 config.sort();
1229 config
1230 }
1231
1232 pub fn send_append_log_and_tick(
1233 &mut self,
1234 prev_log_index: LogIndex,
1235 prev_log_term: Term,
1236 data: Option<TestState>,
1237 ) {
1238 let mut entries: Log<TestState> = Default::default();
1239 let _ = entries.push(LogEntry::with_data(prev_log_term, data));
1240
1241 let append_log = AppendLog(AppendLogArgs {
1242 term: 1,
1243 prev_log_index,
1244 prev_log_term,
1245 entries,
1246 leader_commit: 0,
1247 });
1248
1249 self.send_and_tick(self.package(append_log))
1250 }
1251 }
1252
1253 pub struct ContextBuilder<'a> {
1254 server_builder: ServerBuilder<'a>,
1255 }
1256
1257 impl<'a> ContextBuilder<'a> {
1258 pub fn new() -> Self {
1259 Self {
1260 server_builder: ServerBuilder::new(),
1261 }
1262 }
1263
1264 pub fn follower<'b: 'a>(&'b mut self) -> TestContext {
1265 let (mut in_queue_test, mut out_queue_test, mut server, cluster, comm_service) =
1266 self.server_builder.follower();
1267 TestContext {
1268 in_queue_test,
1269 out_queue_test,
1270 server,
1271 cluster,
1272 comm_service,
1273 }
1274 }
1275 pub fn leader<'b: 'a>(&'b mut self) -> TestContext {
1276 let (mut in_queue_test, mut out_queue_test, mut server, cluster, comm_service) =
1277 self.server_builder.leader();
1278 TestContext {
1279 in_queue_test,
1280 out_queue_test,
1281 server,
1282 cluster,
1283 comm_service,
1284 }
1285 }
1286 }
1287
1288 pub type BuilderOutput<'b> = (
1289 FakeDcsInMsgQueue<RaftPackage<TestState>>,
1290 FakeDcsOutMsgQueue<RaftPackage<TestState>>,
1291 RaftService<FakeTimer, TestState>,
1292 LinearMap<SystemNodeId, ClusterMember, CLUSTER_NODE_COUNT>,
1293 &'b mut FakeCommunicationService<'b>,
1294 );
1295
1296 pub struct ServerBuilder<'a> {
1297 test_rx: Option<FakeDcsInMsgQueue<RaftPackage<TestState>>>,
1298 comm_service_tx: FakeDcsOutMsgQueue<RaftPackage<TestState>>,
1299 test_tx: Option<FakeDcsOutMsgQueue<RaftPackage<TestState>>>,
1300 comm_service_rx: FakeDcsInMsgQueue<RaftPackage<TestState>>,
1301 communication_service: Option<FakeCommunicationService<'a>>,
1302 }
1303
1304 impl<'a> ServerBuilder<'a> {
1305 #[allow(clippy::new_without_default)]
1306 pub fn new() -> Self {
1307 let (out_rx, out_tx) = FakeDcsMsgQueue::build_pair();
1308 let (in_rx, in_tx) = FakeDcsMsgQueue::build_pair();
1309
1310 Self {
1311 test_rx: Some(out_rx),
1312 comm_service_tx: out_tx,
1313 test_tx: Some(in_tx),
1314 comm_service_rx: in_rx,
1315 communication_service: None,
1316 }
1317 }
1318
1319 pub fn follower<'b: 'a>(&'b mut self) -> BuilderOutput {
1320 let mut timer: FakeTimer = FakeTimer::new();
1321
1322 self.communication_service = Some(FakeCommunicationService::new(
1323 &mut self.comm_service_tx,
1324 &mut self.comm_service_rx,
1325 ));
1326 timer = Stopwatch::from_millis(ELECTION_TIMEOUT);
1327 let mut cluster: LinearMap<SystemNodeId, ClusterMember, CLUSTER_NODE_COUNT> =
1328 LinearMap::new();
1329 let _ = cluster.insert(
1330 0.into(),
1331 ClusterMember {
1332 id: SystemNodeId::from(0),
1333 vote_granted: Abstained,
1334 match_idx: 0,
1335 next_idx: 1,
1336 last_successful_heartbeat: 0,
1337 },
1338 );
1339 let _ = cluster.insert(
1340 2.into(),
1341 ClusterMember {
1342 id: SystemNodeId::from(2),
1343 vote_granted: Abstained,
1344 match_idx: 0,
1345 next_idx: 1,
1346 last_successful_heartbeat: 0,
1347 },
1348 );
1349 let mut server = RaftService::new(
1350 timer, Some(cluster.clone()), SystemNodeId::from(1), ELECTION_TIMEOUT
1351 );
1352
1353 server.leader_id = Some(SystemNodeId::from(0));
1354 (
1355 self.test_rx.take().unwrap(),
1356 self.test_tx.take().unwrap(),
1357 server,
1358 cluster,
1359 self.communication_service.as_mut().unwrap(),
1360 )
1361 }
1362
1363 pub fn server_in_cluster<'b: 'a>(&'b mut self) -> BuilderOutput {
1364 self.communication_service = Some(FakeCommunicationService::new(
1365 &mut self.comm_service_tx,
1366 &mut self.comm_service_rx,
1367 ));
1368 let mut timer: FakeTimer = FakeTimer::new();
1369
1370 let cluster = <ServerBuilder<'a>>::initialize_cluster_with_members();
1371 timer = Stopwatch::from_millis(ELECTION_TIMEOUT);
1372 let server = RaftService::new(
1373 timer, Some(cluster.clone()), SystemNodeId::from(0), ELECTION_TIMEOUT
1374 );
1375 (
1376 self.test_rx.take().unwrap(),
1377 self.test_tx.take().unwrap(),
1378 server,
1379 cluster,
1380 self.communication_service.as_mut().unwrap(),
1381 )
1382 }
1383
1384 pub fn candidate<'b: 'a>(&'b mut self) -> BuilderOutput {
1385 self.communication_service = Some(FakeCommunicationService::new(
1386 &mut self.comm_service_tx,
1387 &mut self.comm_service_rx,
1388 ));
1389 let timer: FakeTimer = FakeTimer::new();
1390
1391 let cluster = <ServerBuilder<'a>>::initialize_cluster_with_members();
1392
1393 let mut server = RaftService::new(
1394 timer, Some(cluster.clone()), SystemNodeId::from(0), ELECTION_TIMEOUT
1395 );
1396 let communication_service = self.communication_service.as_mut().unwrap();
1397 server.tick(communication_service); server.io.timer = Stopwatch::from_millis(CANDIATE_ELECTION_TIMEOUT);
1399 self.test_rx.as_mut().unwrap().pop().unwrap(); self.test_rx.as_mut().unwrap().pop().unwrap(); (
1403 self.test_rx.take().unwrap(),
1404 self.test_tx.take().unwrap(),
1405 server,
1406 cluster,
1407 communication_service,
1408 )
1409 }
1410
1411 pub fn leader<'b: 'a>(&'b mut self) -> BuilderOutput {
1412 self.communication_service = Some(FakeCommunicationService::new(
1413 &mut self.comm_service_tx,
1414 &mut self.comm_service_rx,
1415 ));
1416
1417 let mut cluster = <ServerBuilder<'a>>::initialize_cluster_with_members();
1418 let timer: FakeTimer = FakeTimer::new();
1419
1420 let mut server = RaftService::new(
1421 timer, Some(cluster.clone()), SystemNodeId::from(0), ELECTION_TIMEOUT
1422 );
1423 let communication_service = self.communication_service.as_mut().unwrap();
1424 server.tick(communication_service); self.test_rx.as_mut().unwrap().pop().unwrap(); self.test_rx.as_mut().unwrap().pop().unwrap(); let server_id = server.id.clone();
1429 for member in cluster.values().filter(|member| member.id != server_id) {
1430 let grant_vote = RaftMessage::RequestVoteResponse(RequestVoteResponseResult {
1431 term: 1,
1432 granted: true,
1433 });
1434 let msg = package(grant_vote)
1435 .from(member.id)
1436 .to(server.id)
1437 .build()
1438 .unwrap();
1439 self.test_tx.as_mut().unwrap().push(msg);
1440 server.tick(communication_service);
1441 }
1442
1443 for member in cluster.values().filter(|member| member.id != server_id) {
1444 let accept_appendlog = RaftMessage::AppendLogResponse(AppendLogResponseResult {
1445 term: 1,
1446 success: true,
1447 });
1448 let msg = package(accept_appendlog)
1449 .from(member.id)
1450 .to(server.id)
1451 .build()
1452 .unwrap();
1453 self.test_tx.as_mut().unwrap().push(msg);
1454 server.tick(communication_service);
1455 }
1456
1457 for _ in cluster.values().filter(|member| member.id != server.id) {
1458 self.test_rx.as_mut().unwrap().pop(); }
1460 (
1461 self.test_rx.take().unwrap(),
1462 self.test_tx.take().unwrap(),
1463 server,
1464 cluster,
1465 communication_service,
1466 )
1467 }
1468
1469 fn initialize_cluster_with_members() -> LinearMap<SystemNodeId, ClusterMember, CLUSTER_NODE_COUNT> {
1470 let mut cluster: LinearMap<SystemNodeId, ClusterMember, CLUSTER_NODE_COUNT> = LinearMap::new();
1471 let _ = cluster.insert(
1472 SystemNodeId::from(1),
1473 ClusterMember {
1474 id: SystemNodeId::from(1),
1475 vote_granted: Abstained,
1476 match_idx: 0,
1477 next_idx: 1,
1478 last_successful_heartbeat: 0,
1479 },
1480 );
1481 let _ = cluster.insert(
1482 SystemNodeId::from(2),
1483 ClusterMember {
1484 id: SystemNodeId::from(2),
1485 vote_granted: Abstained,
1486 match_idx: 0,
1487 next_idx: 1,
1488 last_successful_heartbeat: 0,
1489 },
1490 );
1491 cluster
1492 }
1493 }
1494}
1495#[macro_export]
1496macro_rules! assert_message {
1497 ($field:ident in $message:ident == $value:expr) => {{
1498 match $message {
1499 RequestVote(RequestVoteArgs { $field, .. }) => assert_eq!($value, $field),
1500 _ => fail(),
1501 }
1502 }};
1503}