dcs2_raft/server/
follower.rs

1use dcs::communication::messages::Package;
2use dcs::communication::service::CommunicationService;
3use dcs::coordination::Stopwatch;
4
5use crate::messages::{RaftMessage};
6use crate::server::MemberState::Follower;
7use crate::server::{FollowerBehavior, LogData, RaftPackage, RaftService, MemberState};
8use log::*;
9
10impl<T: Stopwatch, L: LogData> FollowerBehavior<T, L> for RaftService<T, L> {
11    fn parse_message(
12        &mut self,
13        package: RaftPackage<L>,
14        comm_service: &mut dyn CommunicationService<RaftPackage<L>>,
15    ) -> MemberState {
16        let Package { header, body } = package;
17        match body {
18            RaftMessage::RequestVote(req) => {
19                <dyn follower_behaviour::Follower<T, L>>::handle_request_vote(
20                    self,
21                    header,
22                    req,
23                    comm_service,
24                )
25            }
26            RaftMessage::AppendLog(req) => {
27                <dyn follower_behaviour::Follower<T, L>>::handle_append_log(
28                    self,
29                    header,
30                    req,
31                    comm_service,
32                )
33            }
34            RaftMessage::ReadRequest => self.reject_read_request(header, comm_service),
35            RaftMessage::WriteRequestReply(_, _) => {}
36            RaftMessage::WriteRequest(_args) => self.reject_write_request(header, comm_service),
37            RaftMessage::InstallSnapshot(args) => {
38                <dyn follower_behaviour::Follower<T, L>>::handle_install_snapshot(
39                    self,
40                    header,
41                    args,
42                    comm_service,
43                )
44            }
45            msg => log::warn!("Unexpected message received as Follower: {:?}", msg),
46        }
47        Follower
48    }
49
50    fn after_tick(&mut self, comm_service: &mut dyn CommunicationService<RaftPackage<L>>) {
51        // apply new config
52        let last_config = self
53            .log
54            .iter()
55            .filter_map(|entry| entry.config_change.as_ref())
56            .last();
57        let current_config = self.current_config();
58        if last_config.is_some() && last_config.unwrap() != &current_config {
59            let last = last_config.cloned().unwrap();
60            self.update_config(&last);
61        }
62        if self.io.timer.is_timeout() {
63            self.start_election(comm_service)
64        }
65    }
66}
67
68mod follower_behaviour {
69    use std::cmp::min;
70    use dcs::communication::messages::{Header, PackageBuilder};
71    use dcs::communication::service::CommunicationService;
72    use log::{debug, info, trace};
73
74    use dcs::coordination::Stopwatch;
75    use dcs::nodes::SystemNodeId;
76
77    use crate::messages::RaftMessage::{AppendLogResponse, RequestVoteResponse};
78    use crate::messages::*;
79    use crate::server::ElectionVote::{Abstained, Granted};
80    use crate::server::{package, LogData, RaftPackage, RaftService};
81
82    pub trait Follower<T, L> {}
83
84    impl<T: Stopwatch, L: LogData> dyn Follower<T, L> {
85        pub fn handle_install_snapshot(
86            server: &mut RaftService<T, L>,
87            header: Header<SystemNodeId>,
88            args: InstallSnapshotArgs<L>,
89            comm_service: &mut dyn CommunicationService<RaftPackage<L>>,
90        ) {
91            if server.term <= args.term {
92                server
93                    .log
94                    .install_snapshot(args.last_included_index, args.data.clone());
95                server.leader_id = Some(args.leader_id);
96            }
97            let msg = RaftMessage::InstallSnapshotResponse(server.term);
98            let pkg = package(msg)
99                .from(server.id.into())
100                .to(header.from)
101                .build()
102                .unwrap();
103            comm_service.push(pkg);
104        }
105
106        pub fn handle_append_log(
107            server: &mut RaftService<T, L>,
108            header: Header<SystemNodeId>,
109            args: AppendLogArgs<L>,
110            comm_service: &mut dyn CommunicationService<RaftPackage<L>>,
111        ) {
112            if !server.cluster.contains_key(&header.from) {
113                debug!("Ignoring msg {:?} from {:?}", args, header.from);
114                return;
115            }
116            if Self::sender_log_out_of_date(&server.term, &args.term) {
117                debug!("Rejecting append log request: sender is out of date.");
118                Self::reject_append_log(server, header, comm_service);
119            } else if Self::log_is_inconsistent(&args.prev_log_index, &server.log, &args.prev_log_term) {
120                debug!("Rejecting append log request: log is inconsistent.");
121                Self::remove_entry_and_all_after(&args.prev_log_index, &mut server.log);
122                Self::reject_append_log(server, header, comm_service);
123            } else {
124                debug!("Accepting append log request.");
125                server.io.set_follower_timeout();
126                server.clean_state_from_previous_election();
127
128                if let Some(leader_id) = server.leader_id {
129                    if leader_id != header.from {
130                        info!("Becoming follower of node #{}.", header.from)
131                    }
132                } else if server.leader_id.is_none() {
133                    info!("Becoming follower of node #{}.", header.from)
134                }
135                server.leader_id = Some(header.from.into());
136
137                if !args.entries.is_empty() {
138                    Self::append_entries(&mut server.log, &args);
139                }
140                server.commit_index = min(args.leader_commit, server.log.last_index() as LogIndex);
141                Self::accept_append_log(server, header, comm_service);
142            }
143        }
144
145        fn sender_log_out_of_date(server_term: &Term, sender_term: &Term) -> bool {
146            sender_term < server_term
147        }
148
149        fn accept_append_log(
150            server: &mut RaftService<T, L>,
151            header: Header<SystemNodeId>,
152            comm_service: &mut dyn CommunicationService<RaftPackage<L>>,
153        ) {
154            Self::answer_append_log(server, header, true, comm_service)
155        }
156
157        fn answer_append_log(
158            server: &mut RaftService<T, L>,
159            header: Header<SystemNodeId>,
160            success: bool,
161            comm_service: &mut dyn CommunicationService<RaftPackage<L>>,
162        ) {
163            let answer = AppendLogResponse(AppendLogResponseResult { term: server.term, success});
164            comm_service.push(server.build_message(answer, header.from).unwrap())
165        }
166
167        fn append_entries(log: &mut Log<L>, args: &AppendLogArgs<L>) {
168            let starting_index = args.prev_log_index;
169            debug!("Appending entries into log, starting at position {}.", starting_index);
170            trace!("New entries: {:?}", args.entries);
171            trace!("Log: {:?}", log);
172            for i in 1..args.entries.len() + 1 {
173                let current_position = starting_index as usize + i;
174                log.insert(
175                    current_position as LogIndex,
176                    args.entries.get(i as LogIndex).unwrap().clone(),
177                );
178            }
179            trace!("Updated log: {:?}", log);
180        }
181
182        fn remove_entry_and_all_after(idx: &LogIndex, log: &mut Log<L>) {
183            let elements_to_remove = log.len() as i32 - *idx as i32;
184            for _ in 0..elements_to_remove {
185                log.pop();
186            }
187        }
188
189        fn log_is_inconsistent(prev_log_index: &LogIndex, logs: &Log<L>, term: &Term) -> bool {
190            !Self::log_is_consistent(prev_log_index, logs, term)
191        }
192
193        fn log_is_consistent(prev_log_index: &LogIndex, logs: &Log<L>, term: &Term) -> bool {
194            trace!("prev log idx {:?} term {:?}", prev_log_index, term);
195            if *prev_log_index == 0 {
196                return true;
197            }
198            trace!(
199                "element at prev_log_idx - 1 {:?}",
200                logs.get(*prev_log_index)
201            );
202            trace!("log: {:?}", logs);
203            logs.get(*prev_log_index)
204                .map(|entry| entry.term == *term)
205                .unwrap_or_else(|| false)
206        }
207
208        fn reject_append_log(
209            server: &mut RaftService<T, L>,
210            header: Header<SystemNodeId>,
211            comm_service: &mut dyn CommunicationService<RaftPackage<L>>,
212        ) {
213            Self::answer_append_log(server, header, false, comm_service)
214        }
215
216        pub fn handle_request_vote(
217            server: &mut RaftService<T, L>,
218            header: Header<SystemNodeId>,
219            args: RequestVoteArgs,
220            comm_service: &mut dyn CommunicationService<RaftPackage<L>>,
221        ) {
222            if !server.cluster.contains_key(&header.from) {
223                debug!("Ignoring msg {:?} from {:?}", args, header.from);
224                return;
225            }
226            let candidate_log_out_of_date = Self::candidate_log_is_out_of_date(
227                &args.prev_log_index,
228                &server.log,
229                &args.prev_log_term,
230            );
231            if Self::vote_granted_to_other_candidate(server) || candidate_log_out_of_date {
232                debug!(
233                    "Rejecting vote request. {}",
234                    if candidate_log_out_of_date {
235                        "Log out of date"
236                    } else {
237                        "Granted to another candidate"
238                    }
239                );
240                Self::reject_vote_request(server, header, comm_service);
241            } else {
242                debug!("Granting vote request");
243                server.io.set_follower_timeout();
244                Self::update_term(server, args.term);
245                Self::grant_vote_request(server, header, comm_service);
246            }
247        }
248
249        fn vote_granted_to_other_candidate(server: &mut RaftService<T, L>) -> bool {
250            debug!("vote_granted_to_other_candidate {:?}", &server.cluster);
251            server
252                .cluster
253                .values()
254                .any(|member| member.vote_granted == Granted)
255        }
256
257        fn update_term(server: &mut RaftService<T, L>, term: Term) {
258            if server.term < term {
259                server.term = term;
260            }
261        }
262
263        fn grant_vote_request(
264            server: &mut RaftService<T, L>,
265            header: Header<SystemNodeId>,
266            comm_service: &mut dyn CommunicationService<RaftPackage<L>>,
267        ) {
268            let answer = RequestVoteResponse(RequestVoteResponseResult {
269                term: server.term,
270                granted: true,
271            });
272            if let Some(member) = server.cluster.get_mut(&header.from) {
273                member.vote_granted = Granted
274            }
275            comm_service.push(server.build_message(answer, header.from).unwrap())
276        }
277
278        fn candidate_log_is_out_of_date(
279            prev_log_index: &LogIndex,
280            log: &Log<L>,
281            prev_log_term: &Term,
282        ) -> bool {
283            let follower_last_entry_has_greater_term = log
284                .last()
285                .map(|entry| entry.term > *prev_log_term)
286                .unwrap_or(false);
287
288            let terms_are_equal = log
289                .last()
290                .map(|entry| entry.term == *prev_log_term)
291                .unwrap_or(false);
292
293            let follower_log_is_longer = log.len() > *prev_log_index as usize;
294            log::debug!(
295                "{} {} {}",
296                follower_last_entry_has_greater_term,
297                terms_are_equal,
298                follower_log_is_longer
299            );
300
301            follower_last_entry_has_greater_term || (terms_are_equal && follower_log_is_longer)
302        }
303
304        fn reject_vote_request(
305            server: &mut RaftService<T, L>,
306            header: Header<SystemNodeId>,
307            comm_service: &mut dyn CommunicationService<RaftPackage<L>>,
308        ) {
309            let answer = RequestVoteResponse(RequestVoteResponseResult {
310                term: server.term,
311                granted: false,
312            });
313            comm_service.push(server.build_message(answer, header.from).unwrap())
314        }
315    }
316}
317
318#[allow(non_snake_case)]
319#[cfg(test)]
320mod given_follower {
321    use core::fmt::Debug;
322    use dcs::communication::connection::{InMsgQueue, OutMsgQueue};
323    use dcs::communication::messages::PackageBuilder;
324    use dcs::communication::service::CommunicationService;
325    use dcs::nodes::SystemNodeId;
326
327    use crate::messages::RaftMessage::{
328        AppendLog, AppendLogResponse, RequestVote, RequestVoteResponse,
329    };
330    use crate::messages::RaftMessage::{InstallSnapshot, WriteRequest};
331    use crate::messages::*;
332    use crate::messages::{InstallSnapshotArgs, LogEntry, LogIndex, RaftMessage};
333    use crate::server::follower::test_utils::FollowerContext;
334    use crate::server::test_server_builder::ContextBuilder;
335    use crate::server::test_server_builder::ServerBuilder;
336    use crate::server::test_utils::{fail, FakeTimer, TestState};
337    use crate::server::{package, LogData, RaftPackage, RaftService};
338    use dcs::rules::measurements::Measurement;
339    use dcs::rules::measurements::ClusterType::TEMPERATURE;
340
341    pub fn some_entry() -> Option<TestState> {
342        Some(TestState(1, 1))
343    }
344
345    mod receives_append_entry {
346        use super::*;
347        use crate::server::follower::test_utils::FollowerContext;
348        use crate::server::test_server_builder::ContextBuilder;
349        use crate::server::test_utils::almost_timeout_follower;
350
351        #[test]
352        fn duplicated_entries_are_ignored() {
353            let mut builder = ServerBuilder::new();
354            let (mut in_queue_test, mut out_queue_test, mut server, _cluster, comm_service) =
355                builder.follower();
356
357            send_append_log(&mut out_queue_test, 0, 0, some_entry());
358            send_append_log(&mut out_queue_test, 0, 0, some_entry());
359            server.tick(comm_service);
360            server.tick(comm_service);
361
362            let package = in_queue_test.pop().unwrap();
363            let msg = package.get_message();
364            assert!(append_log_is_successful(msg));
365            let package = in_queue_test.pop().unwrap();
366            let msg = package.get_message();
367            assert!(append_log_is_successful(msg));
368
369            assert_eq!(some_entry(), server.log.get(1).unwrap().data);
370            assert_eq!(None, server.log.get(2));
371        }
372
373        #[test]
374        fn from_leader__then_resets_timer() {
375            let mut builder = ServerBuilder::new();
376            let (mut in_queue_test, mut out_queue_test, mut server, _cluster, comm_service) =
377                builder.follower();
378            almost_timeout_follower(&mut server, comm_service); //next tick should trigger election if doesn't receive heartbeat
379
380            send_append_log(&mut out_queue_test, 0, 0, some_entry());
381            server.tick(comm_service);
382
383            assert!(follower_didnt_start_election(&mut in_queue_test))
384        }
385
386        #[test]
387        fn if_invalid_then_dont_update_commit_idx() {
388            let mut builder = ContextBuilder::new();
389            let mut context: FollowerContext = builder.follower().into();
390
391            context.send_append_log_and_tick(vec![(None, 1)], 2, 0, 32);
392
393            let server = context.inner.get_node();
394            assert_eq!(server.commit_index, 0)
395        }
396
397        #[test]
398        fn with_multiple_entries_if_valid_then_updates_commit_index_with_last_new_entry_idx() {
399            let mut builder = ContextBuilder::new();
400            let mut context: FollowerContext = builder.follower().into();
401
402            let prev_log_term = 1;
403            let entries = vec![
404                (None, prev_log_term),
405                (None, prev_log_term),
406                (None, prev_log_term),
407            ];
408            context.send_append_log_and_tick(entries, 0, prev_log_term, 32);
409
410            let server = context.inner.get_node();
411            assert_eq!(server.commit_index, 3)
412        }
413
414        #[test]
415        fn with_single_entry_if_valid_then_updates_commit_index_with_last_new_entry_idx() {
416            let mut builder = ContextBuilder::new();
417            let mut context: FollowerContext = builder.follower().into();
418
419            let prev_log_term = 1;
420            context.send_append_log_and_tick(vec![(None, prev_log_term)], 0, prev_log_term, 32);
421
422            let server = context.inner.get_node();
423            assert_eq!(server.commit_index, 1)
424        }
425
426        #[test]
427        fn if_valid_then_updates_commit_index_with_leader_commit_idx() {
428            let mut builder = ContextBuilder::new();
429            let mut context: FollowerContext = builder.follower().into();
430
431            let prev_log_term = 1;
432            context.send_append_log_and_tick(vec![(None, prev_log_term)], 0, prev_log_term, 32);
433            context.send_append_log_and_tick(vec![(None, prev_log_term)], 1, prev_log_term, 32);
434            context.send_append_log_and_tick(vec![(None, prev_log_term)], 2, prev_log_term, 32);
435            context.send_append_log_and_tick(vec![(None, prev_log_term)], 3, prev_log_term, 1);
436
437            let server = context.inner.get_node();
438            assert_eq!(server.commit_index, 1)
439        }
440
441        mod with_empty_log {
442            use super::*;
443
444            #[test]
445            fn and_entry_has_greater_nextIndex__then_request_fails() {
446                let mut builder = ServerBuilder::new();
447                let (mut in_queue_test, mut out_queue_test, mut server, _cluster, comm_service) =
448                    builder.follower();
449
450                send_append_log(&mut out_queue_test, 2, 0, some_entry());
451                server.tick(comm_service);
452
453                let package = in_queue_test.pop().unwrap();
454                let msg = package.get_message();
455                assert!(append_log_is_not_successful(msg));
456            }
457
458            #[test]
459            fn and_entry_has_correct_nextIndex_and_term_match__then__request_succeeds() {
460                let mut builder = ServerBuilder::new();
461                let (mut in_queue_test, mut out_queue_test, mut server, _cluster, comm_service) =
462                    builder.follower();
463
464                send_append_log(&mut out_queue_test, 0, 0, some_entry());
465                server.tick(comm_service);
466
467                let package = in_queue_test.pop().unwrap();
468                let msg = package.get_message();
469                assert!(append_log_is_successful(msg));
470            }
471        }
472
473        mod with_nonempty_log {
474            use super::*;
475
476            #[test]
477            fn when_receives_append_entry_with_correct_nextIndex_but_term_doesnt_match__then_request_fails(
478            ) {
479                let mut builder = ServerBuilder::new();
480                let (mut in_queue_test, mut out_queue_test, mut server, _cluster, comm_service) =
481                    builder.follower();
482                add_log_entry_to_server(
483                    &mut server,
484                    &mut out_queue_test,
485                    &mut in_queue_test,
486                    comm_service,
487                    some_entry(),
488                );
489
490                send_append_log(&mut out_queue_test, 1, 99, some_entry());
491                server.tick(comm_service);
492
493                if let AppendLogResponse(AppendLogResponseResult { success, .. }) =
494                    in_queue_test.pop().unwrap().get_message()
495                {
496                    assert!(!success)
497                } else {
498                    fail()
499                }
500            }
501
502            #[test]
503            fn when_receives_append_entry_with_smaller_nextIndex__then_request_succeeds_and_log_is_updated(
504            ) {
505                let mut builder = ServerBuilder::new();
506                let (mut in_queue_test, mut out_queue_test, mut server, _cluster, comm_service) =
507                    builder.follower();
508
509                add_log_entries_to_server(
510                    &mut server,
511                    &mut out_queue_test,
512                    &mut in_queue_test,
513                    comm_service,
514                    vec![some_entry(), some_other_entry()],
515                );
516
517                send_append_log(&mut out_queue_test, 1, 1, some_entry());
518                server.tick(comm_service);
519
520                assert!(append_log_is_successful(
521                    in_queue_test.pop().unwrap().get_message()
522                ));
523                assert_eq!(some_entry(), server.log.get(1).unwrap().data);
524                assert_eq!(some_entry(), server.log.get(2).unwrap().data);
525            }
526
527            #[test]
528            fn when_receives_append_entry_with_correct_nextIndex_and_term__then_request_succeeds() {
529                let mut builder = ServerBuilder::new();
530                let (mut in_queue_test, mut out_queue_test, mut server, _cluster, comm_service) =
531                    builder.follower();
532                add_log_entry_to_server(
533                    &mut server,
534                    &mut out_queue_test,
535                    &mut in_queue_test,
536                    comm_service,
537                    some_entry(),
538                );
539
540                send_append_log(&mut out_queue_test, 1, 1, some_entry());
541                server.tick(comm_service);
542
543                let package = in_queue_test.pop().unwrap();
544                let msg = package.get_message();
545                assert!(append_log_is_successful(msg));
546            }
547        }
548
549        mod receives_new_config {
550            use super::*;
551
552            #[test]
553            fn applies_it_immediately() {
554                let mut builder = ContextBuilder::new();
555                let mut context = builder.follower();
556
557                let mut new_config = context.current_config();
558                new_config.remove(2);
559                let entry =
560                    LogEntry::<TestState>::with_config(context.term(), Some(new_config.clone()));
561                let mut log = Log::new();
562                log.push(entry);
563                let args = AppendLogArgs {
564                    term: 0,
565                    prev_log_index: 0,
566                    prev_log_term: 0,
567                    entries: log,
568                    leader_commit: 0,
569                };
570                context.empty_queue();
571                context.send_and_tick(context.package(AppendLog(args)));
572                assert_eq!(
573                    AppendLogResponse(AppendLogResponseResult {
574                        term: 0,
575                        success: true
576                    }),
577                    context.recv_message().unwrap()
578                );
579
580                assert_eq!(2, context.cluster_size());
581            }
582        }
583    }
584
585    mod receives_vote_request {
586        use super::*;
587        use crate::server::test_utils::almost_timeout_follower;
588
589        #[test]
590        fn at_startup_when_granting_vote_request_then_resets_timer() {
591            let mut builder = ServerBuilder::new();
592            let (mut in_queue_test, mut out_queue_test, mut server, _cluster, comm_service) =
593                builder.server_in_cluster();
594            almost_timeout_follower(&mut server, comm_service);
595
596            send_request_vote(&mut out_queue_test, 0, 0, 1, 1);
597            server.tick(comm_service);
598
599            assert!(follower_didnt_start_election(&mut in_queue_test))
600        }
601
602        #[test]
603        fn from_candidate_with_smaller_last_term_in_log__then_request_is_rejected() {
604            let mut builder = ServerBuilder::new();
605            let (mut in_queue_test, mut out_queue_test, mut server, _cluster, comm_service) =
606                builder.follower();
607            add_log_entry_to_server(
608                &mut server,
609                &mut out_queue_test,
610                &mut in_queue_test,
611                comm_service,
612                some_entry(),
613            );
614
615            send_request_vote(&mut out_queue_test, 0, 0, 0, 0);
616            server.tick(comm_service);
617
618            let package = in_queue_test.pop().unwrap();
619            let msg = package.get_message();
620            assert!(request_vote_not_granted(msg));
621        }
622
623        #[test]
624        fn from_candidate_with_same_last_term_and_shorter_log__then_request_is_rejected() {
625            let mut builder = ServerBuilder::new();
626            let (mut in_queue_test, mut out_queue_test, mut server, _cluster, comm_service) =
627                builder.follower();
628            add_log_entry_to_server(
629                &mut server,
630                &mut out_queue_test,
631                &mut in_queue_test,
632                comm_service,
633                some_entry(),
634            );
635            add_log_entry_to_server(
636                &mut server,
637                &mut out_queue_test,
638                &mut in_queue_test,
639                comm_service,
640                some_entry(),
641            );
642
643            send_request_vote(&mut out_queue_test, 0, 0, 0, 0);
644            server.tick(comm_service);
645
646            let package = in_queue_test.pop().unwrap();
647            let msg = package.get_message();
648            assert!(request_vote_not_granted(msg));
649        }
650
651        mod with_nonempty_log {
652            use super::*;
653
654            #[test]
655            fn candidate_has_valid_log__and_voted_granted_to_no_other__then_vote_is_granted() {
656                let mut builder = ServerBuilder::new();
657                let (mut in_queue_test, mut out_queue_test, mut server, _cluster, comm_service) =
658                    builder.follower();
659                add_log_entry_to_server(
660                    &mut server,
661                    &mut out_queue_test,
662                    &mut in_queue_test,
663                    comm_service,
664                    some_entry(),
665                );
666                add_log_entry_to_server(
667                    &mut server,
668                    &mut out_queue_test,
669                    &mut in_queue_test,
670                    comm_service,
671                    some_entry(),
672                );
673
674                send_request_vote(&mut out_queue_test, 2, 1, 1, 0);
675                server.tick(comm_service);
676
677                let package = in_queue_test.pop().unwrap();
678                let msg = package.get_message();
679                assert!(request_vote_granted(msg));
680            }
681
682            #[test]
683            fn candidate_has_valid_log__and_voted_granted_to_other__then_vote_is_denied() {
684                let mut builder = ServerBuilder::new();
685                let (mut in_queue_test, mut out_queue_test, mut server, _cluster, comm_service) =
686                    builder.follower();
687                add_log_entry_to_server(
688                    &mut server,
689                    &mut out_queue_test,
690                    &mut in_queue_test,
691                    comm_service,
692                    some_entry(),
693                );
694                add_log_entry_to_server(
695                    &mut server,
696                    &mut out_queue_test,
697                    &mut in_queue_test,
698                    comm_service,
699                    some_entry(),
700                );
701
702                send_request_vote(&mut out_queue_test, 2, 0, 0, 0);
703                server.tick(comm_service);
704                in_queue_test.pop().unwrap();
705
706                send_request_vote(&mut out_queue_test, 2, 0, 0, 2);
707                server.tick(comm_service);
708
709                let package = in_queue_test.pop().unwrap();
710                let msg = package.get_message();
711                assert!(request_vote_not_granted(msg));
712            }
713        }
714    }
715
716    mod receives_client_interaction {
717        use crate::messages::RaftMessage::{ReadRequestReply, WriteRequestReply};
718        use crate::messages::{ReadRequestReplyArgs};
719        use crate::server::test_server_builder::ServerBuilder;
720        use crate::server::test_utils::{send_read_request, send_write_request};
721        use dcs::communication::connection::InMsgQueue;
722        use dcs::nodes::SystemNodeId;
723        use dcs::rules::measurements::Measurement;
724        use dcs::rules::measurements::ClusterType::TEMPERATURE;
725
726        #[test]
727        fn receives_write_request__then_redirects_to_leader() {
728            let leader_id = SystemNodeId::from(0);
729            let mut builder = ServerBuilder::new();
730            let (mut in_queue_test, mut out_queue_test, mut server, _cluster, comm_service) =
731                builder.follower();
732
733            send_write_request(
734                &mut server,
735                &mut out_queue_test,
736                Measurement::new(TEMPERATURE, 0),
737                comm_service,
738            );
739            let answer = in_queue_test.pop().unwrap();
740            if WriteRequestReply(false, Some(leader_id)) != answer.body {
741                unreachable!("failed")
742            }
743        }
744
745        #[test]
746        fn receives_read_request__then_redirects_to_leader() {
747            let leader_id = SystemNodeId::from(0);
748            let mut builder = ServerBuilder::new();
749            let (mut in_queue_test, mut out_queue_test, mut server, _cluster, comm_service) =
750                builder.follower();
751
752            send_read_request(&mut server, &mut out_queue_test, comm_service);
753            let answer = in_queue_test.pop().unwrap();
754            if ReadRequestReply(ReadRequestReplyArgs {
755                success: false,
756                data: None,
757                redirect: Some(leader_id),
758            }) != answer.body
759            {
760                unreachable!("failed")
761            }
762        }
763    }
764
765    mod snapshots {
766        use super::*;
767
768        #[test]
769        pub fn snapshot_occurs_at_75_percent_capacity() {
770            let mut builder = ContextBuilder::new();
771            let mut context: FollowerContext = builder.follower().into();
772
773            let (pre_snapshot_capacity, post_snapshot_capacity) =
774                context.trigger_snapshot_at_75_percent_occupation();
775
776            assert!(pre_snapshot_capacity < post_snapshot_capacity);
777        }
778
779        #[test]
780        pub fn can_receive_append_log_after_snapshot() {
781            let mut builder = ContextBuilder::new();
782            let mut context: FollowerContext = builder.follower().into();
783            context.trigger_snapshot_at_75_percent_occupation();
784            context.inner.empty_queue();
785
786            context.send_append_log_and_tick(vec![(Some(TestState(3, 4)), 0)], 29, 1, u32::MAX);
787
788            let msg = context.inner.recv_message().unwrap();
789            assert!(append_log_is_successful(&msg));
790            let log = context.inner.get_node().log.clone();
791            assert_eq!(TestState(3, 4), log.last().unwrap().data.unwrap());
792        }
793    }
794
795    pub mod given_install_snapshot {
796        use dcs::nodes::SystemNodeId;
797        use super::*;
798
799        #[test]
800        pub fn updates_the_log() {
801            let mut builder = ContextBuilder::new();
802            let mut context: FollowerContext = builder.follower().into();
803
804            let snapshot_entry = context.install_snapshot().data;
805
806            assert_eq!(snapshot_entry, context.last_log_entry())
807        }
808
809        #[test]
810        pub fn updates_leader_id() {
811            let mut builder = ContextBuilder::new();
812            let mut context: FollowerContext = builder.follower().into();
813            context.install_snapshot();
814
815            context.inner.empty_queue();
816            context.inner.send_and_tick(
817                context
818                    .inner
819                    .package(WriteRequest(WriteRequestArgs::with_measurement(4.into(), Measurement::new(TEMPERATURE, 1)))),
820            );
821
822            let msg = context.inner.recv_message().unwrap();
823
824            let id = Some(SystemNodeId::from(1));
825            assert!(
826                matches!(msg, RaftMessage::WriteRequestReply(false, id)),
827                "Did not expect message {msg:?}"
828            );
829        }
830
831        #[test]
832        pub fn replaces_up_to_last_included_entry_with_snapshot() {
833            let mut builder = ContextBuilder::new();
834            let mut context: FollowerContext = builder.follower().into();
835            let prev_log_term = 1;
836            context.send_append_log_and_tick(
837                vec![(Some(TestState(1, 1)), prev_log_term)],
838                0,
839                prev_log_term,
840                32,
841            );
842            context.send_append_log_and_tick(
843                vec![(Some(TestState(2, 2)), prev_log_term)],
844                1,
845                prev_log_term,
846                32,
847            );
848            context.send_append_log_and_tick(
849                vec![(Some(TestState(3, 3)), prev_log_term)],
850                2,
851                prev_log_term,
852                32,
853            );
854            context.send_append_log_and_tick(
855                vec![(Some(TestState(4, 4)), prev_log_term)],
856                3,
857                prev_log_term,
858                32,
859            );
860
861            let log_before_snapshot = context.log();
862
863            let install_snapshot = context.install_snapshot();
864
865            let log_after_snapshot = context.log();
866
867            let len_after_entries_removed =
868                log_before_snapshot.len() - install_snapshot.last_included_index as usize;
869            let snapshot_entries_added = 1;
870            assert_eq!(
871                len_after_entries_removed + snapshot_entries_added,
872                log_after_snapshot.len()
873            )
874        }
875
876        #[test]
877        pub fn if_term_is_less_than_current_term_responds_immediately_and_dont_update() {
878            let mut builder = ContextBuilder::new();
879            let mut context: FollowerContext = builder.follower().into();
880            let prev_log_term = 1;
881            context.send_append_log_and_tick(
882                vec![(Some(TestState(1, 1)), prev_log_term)],
883                0,
884                prev_log_term,
885                32,
886            );
887            context.send_append_log_and_tick(
888                vec![(Some(TestState(2, 2)), prev_log_term)],
889                1,
890                prev_log_term,
891                32,
892            );
893            context.send_append_log_and_tick(
894                vec![(Some(TestState(3, 3)), prev_log_term)],
895                2,
896                prev_log_term,
897                32,
898            );
899            context.send_append_log_and_tick(
900                vec![(Some(TestState(4, 4)), prev_log_term)],
901                3,
902                prev_log_term,
903                32,
904            );
905            context.inner.empty_queue();
906            let log_before = context.log();
907
908            context.set_term(15);
909            let install_snapshot = context.install_snapshot();
910
911            let log_after = context.log();
912            let message = context.inner.recv_message().unwrap();
913            assert!(matches!(message, RaftMessage::InstallSnapshotResponse(15)));
914            assert_eq!(log_before, log_after);
915        }
916
917        #[test]
918        pub fn responds_with_current_term() {
919            let mut builder = ContextBuilder::new();
920            let mut context: FollowerContext = builder.follower().into();
921            let prev_log_term = 1;
922            context.send_append_log_and_tick(
923                vec![(Some(TestState(1, 1)), prev_log_term)],
924                0,
925                prev_log_term,
926                32,
927            );
928            context.send_append_log_and_tick(
929                vec![(Some(TestState(2, 2)), prev_log_term)],
930                1,
931                prev_log_term,
932                32,
933            );
934            context.send_append_log_and_tick(
935                vec![(Some(TestState(3, 3)), prev_log_term)],
936                2,
937                prev_log_term,
938                32,
939            );
940            context.send_append_log_and_tick(
941                vec![(Some(TestState(4, 4)), prev_log_term)],
942                3,
943                prev_log_term,
944                32,
945            );
946            context.inner.empty_queue();
947
948            context.set_term(5);
949            let install_snapshot = context.install_snapshot();
950
951            let message = context.inner.recv_message().unwrap();
952            assert!(
953                matches!(message, RaftMessage::InstallSnapshotResponse(5)),
954                "Wasn't expecting {message:?}"
955            );
956        }
957    }
958
959    pub fn some_other_entry() -> Option<TestState> {
960        Some(TestState(37, 12))
961    }
962
963    fn append_log_is_successful(msg: &RaftMessage<TestState>) -> bool {
964        matches!(msg, AppendLogResponse(AppendLogResponseResult { success, .. }) if *success)
965    }
966
967    fn append_log_is_not_successful(msg: &RaftMessage<TestState>) -> bool {
968        matches!(msg, AppendLogResponse(AppendLogResponseResult { success, .. }) if !success)
969    }
970
971    fn send_append_log<T: LogData + Debug>(
972        out_queue_test: &mut dyn OutMsgQueue<RaftPackage<T>>,
973        prev_log_index: LogIndex,
974        prev_log_term: Term,
975        data: Option<T>,
976    ) {
977        let mut entries: Log<T> = Default::default();
978        let _ = entries.push(LogEntry::with_data(prev_log_term, data));
979
980        let append_log = AppendLog(AppendLogArgs {
981            term: 1,
982            prev_log_index,
983            prev_log_term,
984            entries,
985            leader_commit: 0,
986        });
987
988        let append_log_req = package(append_log)
989            .from(0.into())
990            .to(1.into())
991            .build()
992            .unwrap();
993
994        out_queue_test.push(append_log_req);
995    }
996
997    fn add_log_entries_to_server<L: LogData + Debug>(
998        server: &mut RaftService<FakeTimer, L>,
999        out_queue_test: &mut dyn OutMsgQueue<RaftPackage<L>>,
1000        in_queue_test: &mut dyn InMsgQueue<RaftPackage<L>>,
1001        comm_service: &mut dyn CommunicationService<RaftPackage<L>>,
1002        entry: Vec<Option<L>>,
1003    ) {
1004        for (idx, e) in entry.into_iter().enumerate() {
1005            send_append_log(out_queue_test, idx as LogIndex, 1, e);
1006            server.tick(comm_service);
1007            in_queue_test.pop().unwrap();
1008        }
1009    }
1010
1011    fn add_log_entry_to_server<L: LogData + Debug>(
1012        server: &mut RaftService<FakeTimer, L>,
1013        out_queue_test: &mut dyn OutMsgQueue<RaftPackage<L>>,
1014        in_queue_test: &mut dyn InMsgQueue<RaftPackage<L>>,
1015        comm_service: &mut dyn CommunicationService<RaftPackage<L>>,
1016        entry: Option<L>,
1017    ) {
1018        send_append_log(out_queue_test, 0, 1, entry);
1019        server.tick(comm_service);
1020        in_queue_test.pop().unwrap();
1021    }
1022
1023    fn request_vote_granted(msg: &RaftMessage<TestState>) -> bool {
1024        matches!(msg, RequestVoteResponse(RequestVoteResponseResult { granted, .. }) if *granted)
1025    }
1026
1027    fn request_vote_not_granted(msg: &RaftMessage<TestState>) -> bool {
1028        matches!(msg, RequestVoteResponse(RequestVoteResponseResult { granted, .. }) if !granted)
1029    }
1030
1031    fn send_request_vote<T: LogData + Debug>(
1032        out_queue_test: &mut dyn OutMsgQueue<RaftPackage<T>>,
1033        prev_log_index: LogIndex,
1034        prev_log_term: Term,
1035        term: Term,
1036        from_id: u32,
1037    ) {
1038        let request_vote = RequestVote(RequestVoteArgs {
1039            term,
1040            prev_log_index,
1041            prev_log_term,
1042        });
1043        let append_log_req = package(request_vote)
1044            .from(SystemNodeId::from(from_id))
1045            .to(1.into())
1046            .build()
1047            .unwrap();
1048        out_queue_test.push(append_log_req);
1049    }
1050
1051    pub fn follower_didnt_start_election(
1052        comm_service: &mut dyn InMsgQueue<RaftPackage<TestState>>,
1053    ) -> bool {
1054        let append_log_response =
1055            RaftMessage::<TestState>::AppendLogResponse(AppendLogResponseResult {
1056                term: 0,
1057                success: true,
1058            });
1059        let actual_append_log_response = comm_service.pop().unwrap().body;
1060        matches!(append_log_response, actual_append_log_response) && comm_service.pop().is_none()
1061    }
1062}
1063
1064#[cfg(test)]
1065mod test_utils {
1066    use dcs::nodes::SystemNodeId;
1067    use crate::messages::RaftMessage::{AppendLog, InstallSnapshot};
1068    use crate::messages::{
1069        AppendLogArgs, InstallSnapshotArgs, Log, LogEntry, LogIndex, Term, LOG_LEN,
1070    };
1071    use crate::server::leader::test_utils::LeaderContext;
1072    use crate::server::test_server_builder::TestContext;
1073    use crate::server::test_utils::TestState;
1074    use dcs::rules::measurements::Measurement;
1075    use dcs::rules::measurements::ClusterType::TEMPERATURE;
1076
1077    pub struct FollowerContext<'a> {
1078        pub inner: TestContext<'a>,
1079    }
1080
1081    impl<'a> FollowerContext<'a> {
1082        pub fn set_term(&mut self, term: Term) {
1083            self.inner.get_node_mut().term = term;
1084        }
1085
1086        pub(crate) fn trigger_snapshot_at_75_percent_occupation(&mut self) -> (f32, f32) {
1087            let count_to_fill_log = LOG_LEN as f32 * 0.74;
1088            for i in 0..count_to_fill_log.floor() as u32 {
1089                self.send_append_log_and_tick(vec![(Some(TestState(2, 2)), 1)], i, 1, u32::MAX);
1090            }
1091            let pre_snapshot_capacity = self.inner.get_node().log.capacity();
1092            let count_to_snapshot = count_to_fill_log * 1.1;
1093            for i in count_to_fill_log as u32..count_to_snapshot.ceil() as u32 {
1094                self.send_append_log_and_tick(vec![(Some(TestState(2, 2)), 1)], i, 1, u32::MAX);
1095            }
1096            let post_snapshot_capacity = self.inner.get_node().log.capacity();
1097            (pre_snapshot_capacity, post_snapshot_capacity)
1098        }
1099
1100        pub fn send_append_log_and_tick(
1101            &mut self,
1102            data: Vec<(Option<TestState>, Term)>,
1103            prev_log_index: LogIndex,
1104            prev_log_term: Term,
1105            leader_commit: LogIndex,
1106        ) {
1107            let mut entries: Log<TestState> = Default::default();
1108            for datum in data {
1109                let _ = entries.push(LogEntry::with_data(datum.1, datum.0));
1110            }
1111
1112            let append_log = AppendLog(AppendLogArgs {
1113                term: 1,
1114                prev_log_index,
1115                prev_log_term,
1116                entries: entries.clone(),
1117                leader_commit,
1118            });
1119
1120            self.inner.send_and_tick(self.inner.package(append_log));
1121        }
1122
1123        pub fn install_snapshot(&mut self) -> InstallSnapshotArgs<TestState> {
1124            let snapshot_entry = LogEntry::new(7, Some(TestState(5, 7)), None, None);
1125
1126            let args = InstallSnapshotArgs {
1127                term: snapshot_entry.term,
1128                leader_id: SystemNodeId::from(1),
1129                last_included_index: 3,
1130                last_included_term: 1,
1131                data: snapshot_entry,
1132            };
1133            let install_snapshot = InstallSnapshot(args.clone());
1134            self.inner
1135                .send_and_tick(self.inner.package(install_snapshot));
1136            args
1137        }
1138
1139        pub fn last_log_entry(&self) -> LogEntry<TestState> {
1140            self.inner.get_node().log.clone().last().unwrap().clone()
1141        }
1142
1143        pub fn log(&self) -> Log<TestState> {
1144            self.inner.get_node().log.clone()
1145        }
1146    }
1147
1148    impl<'a> From<TestContext<'a>> for FollowerContext<'a> {
1149        fn from(context: TestContext<'a>) -> Self {
1150            Self { inner: context }
1151        }
1152    }
1153}
1154
1155#[macro_export]
1156macro_rules! initialize_follower {
1157    ($in_queue_test:ident, $out_queue_test:ident, $server:ident ) => {
1158        let mut timer: FakeTimer = FakeTimer::new();
1159        timer.reset_election_timer();
1160        timer.reset_election_timer();
1161        let (mut $in_queue_test, mut out_queue_server) = FakeMsgQueue::new();
1162        let (mut in_queue_server, mut $out_queue_test) = FakeMsgQueue::new();
1163        let mut cluster: LinearMap<ClusterMemberId, ClusterMember, CLUSTER_NODE_COUNT> =
1164            LinearMap::new();
1165        let _ = cluster.insert(
1166            0,
1167            ClusterMember {
1168                id: 0,
1169                vote_granted: Abstained,
1170                match_idx: 0,
1171                next_idx: 0,
1172            },
1173        );
1174        let _ = cluster.insert(
1175            2,
1176            ClusterMember {
1177                id: 2,
1178                vote_granted: Abstained,
1179                match_idx: 0,
1180                next_idx: 0,
1181            },
1182        );
1183        let mut $server = Server::new(
1184            &mut timer,
1185            &mut out_queue_server,
1186            &mut in_queue_server,
1187            cluster,
1188        );
1189        $server.id = 1;
1190        $server.leader_id = 0;
1191    };
1192}