1use dcs::communication::messages::Package;
2use dcs::communication::service::CommunicationService;
3use dcs::coordination::Stopwatch;
4
5use crate::messages::{RaftMessage};
6use crate::server::MemberState::Follower;
7use crate::server::{FollowerBehavior, LogData, RaftPackage, RaftService, MemberState};
8use log::*;
9
10impl<T: Stopwatch, L: LogData> FollowerBehavior<T, L> for RaftService<T, L> {
11 fn parse_message(
12 &mut self,
13 package: RaftPackage<L>,
14 comm_service: &mut dyn CommunicationService<RaftPackage<L>>,
15 ) -> MemberState {
16 let Package { header, body } = package;
17 match body {
18 RaftMessage::RequestVote(req) => {
19 <dyn follower_behaviour::Follower<T, L>>::handle_request_vote(
20 self,
21 header,
22 req,
23 comm_service,
24 )
25 }
26 RaftMessage::AppendLog(req) => {
27 <dyn follower_behaviour::Follower<T, L>>::handle_append_log(
28 self,
29 header,
30 req,
31 comm_service,
32 )
33 }
34 RaftMessage::ReadRequest => self.reject_read_request(header, comm_service),
35 RaftMessage::WriteRequestReply(_, _) => {}
36 RaftMessage::WriteRequest(_args) => self.reject_write_request(header, comm_service),
37 RaftMessage::InstallSnapshot(args) => {
38 <dyn follower_behaviour::Follower<T, L>>::handle_install_snapshot(
39 self,
40 header,
41 args,
42 comm_service,
43 )
44 }
45 msg => log::warn!("Unexpected message received as Follower: {:?}", msg),
46 }
47 Follower
48 }
49
50 fn after_tick(&mut self, comm_service: &mut dyn CommunicationService<RaftPackage<L>>) {
51 let last_config = self
53 .log
54 .iter()
55 .filter_map(|entry| entry.config_change.as_ref())
56 .last();
57 let current_config = self.current_config();
58 if last_config.is_some() && last_config.unwrap() != ¤t_config {
59 let last = last_config.cloned().unwrap();
60 self.update_config(&last);
61 }
62 if self.io.timer.is_timeout() {
63 self.start_election(comm_service)
64 }
65 }
66}
67
68mod follower_behaviour {
69 use std::cmp::min;
70 use dcs::communication::messages::{Header, PackageBuilder};
71 use dcs::communication::service::CommunicationService;
72 use log::{debug, info, trace};
73
74 use dcs::coordination::Stopwatch;
75 use dcs::nodes::SystemNodeId;
76
77 use crate::messages::RaftMessage::{AppendLogResponse, RequestVoteResponse};
78 use crate::messages::*;
79 use crate::server::ElectionVote::{Abstained, Granted};
80 use crate::server::{package, LogData, RaftPackage, RaftService};
81
82 pub trait Follower<T, L> {}
83
84 impl<T: Stopwatch, L: LogData> dyn Follower<T, L> {
85 pub fn handle_install_snapshot(
86 server: &mut RaftService<T, L>,
87 header: Header<SystemNodeId>,
88 args: InstallSnapshotArgs<L>,
89 comm_service: &mut dyn CommunicationService<RaftPackage<L>>,
90 ) {
91 if server.term <= args.term {
92 server
93 .log
94 .install_snapshot(args.last_included_index, args.data.clone());
95 server.leader_id = Some(args.leader_id);
96 }
97 let msg = RaftMessage::InstallSnapshotResponse(server.term);
98 let pkg = package(msg)
99 .from(server.id.into())
100 .to(header.from)
101 .build()
102 .unwrap();
103 comm_service.push(pkg);
104 }
105
106 pub fn handle_append_log(
107 server: &mut RaftService<T, L>,
108 header: Header<SystemNodeId>,
109 args: AppendLogArgs<L>,
110 comm_service: &mut dyn CommunicationService<RaftPackage<L>>,
111 ) {
112 if !server.cluster.contains_key(&header.from) {
113 debug!("Ignoring msg {:?} from {:?}", args, header.from);
114 return;
115 }
116 if Self::sender_log_out_of_date(&server.term, &args.term) {
117 debug!("Rejecting append log request: sender is out of date.");
118 Self::reject_append_log(server, header, comm_service);
119 } else if Self::log_is_inconsistent(&args.prev_log_index, &server.log, &args.prev_log_term) {
120 debug!("Rejecting append log request: log is inconsistent.");
121 Self::remove_entry_and_all_after(&args.prev_log_index, &mut server.log);
122 Self::reject_append_log(server, header, comm_service);
123 } else {
124 debug!("Accepting append log request.");
125 server.io.set_follower_timeout();
126 server.clean_state_from_previous_election();
127
128 if let Some(leader_id) = server.leader_id {
129 if leader_id != header.from {
130 info!("Becoming follower of node #{}.", header.from)
131 }
132 } else if server.leader_id.is_none() {
133 info!("Becoming follower of node #{}.", header.from)
134 }
135 server.leader_id = Some(header.from.into());
136
137 if !args.entries.is_empty() {
138 Self::append_entries(&mut server.log, &args);
139 }
140 server.commit_index = min(args.leader_commit, server.log.last_index() as LogIndex);
141 Self::accept_append_log(server, header, comm_service);
142 }
143 }
144
145 fn sender_log_out_of_date(server_term: &Term, sender_term: &Term) -> bool {
146 sender_term < server_term
147 }
148
149 fn accept_append_log(
150 server: &mut RaftService<T, L>,
151 header: Header<SystemNodeId>,
152 comm_service: &mut dyn CommunicationService<RaftPackage<L>>,
153 ) {
154 Self::answer_append_log(server, header, true, comm_service)
155 }
156
157 fn answer_append_log(
158 server: &mut RaftService<T, L>,
159 header: Header<SystemNodeId>,
160 success: bool,
161 comm_service: &mut dyn CommunicationService<RaftPackage<L>>,
162 ) {
163 let answer = AppendLogResponse(AppendLogResponseResult { term: server.term, success});
164 comm_service.push(server.build_message(answer, header.from).unwrap())
165 }
166
167 fn append_entries(log: &mut Log<L>, args: &AppendLogArgs<L>) {
168 let starting_index = args.prev_log_index;
169 debug!("Appending entries into log, starting at position {}.", starting_index);
170 trace!("New entries: {:?}", args.entries);
171 trace!("Log: {:?}", log);
172 for i in 1..args.entries.len() + 1 {
173 let current_position = starting_index as usize + i;
174 log.insert(
175 current_position as LogIndex,
176 args.entries.get(i as LogIndex).unwrap().clone(),
177 );
178 }
179 trace!("Updated log: {:?}", log);
180 }
181
182 fn remove_entry_and_all_after(idx: &LogIndex, log: &mut Log<L>) {
183 let elements_to_remove = log.len() as i32 - *idx as i32;
184 for _ in 0..elements_to_remove {
185 log.pop();
186 }
187 }
188
189 fn log_is_inconsistent(prev_log_index: &LogIndex, logs: &Log<L>, term: &Term) -> bool {
190 !Self::log_is_consistent(prev_log_index, logs, term)
191 }
192
193 fn log_is_consistent(prev_log_index: &LogIndex, logs: &Log<L>, term: &Term) -> bool {
194 trace!("prev log idx {:?} term {:?}", prev_log_index, term);
195 if *prev_log_index == 0 {
196 return true;
197 }
198 trace!(
199 "element at prev_log_idx - 1 {:?}",
200 logs.get(*prev_log_index)
201 );
202 trace!("log: {:?}", logs);
203 logs.get(*prev_log_index)
204 .map(|entry| entry.term == *term)
205 .unwrap_or_else(|| false)
206 }
207
208 fn reject_append_log(
209 server: &mut RaftService<T, L>,
210 header: Header<SystemNodeId>,
211 comm_service: &mut dyn CommunicationService<RaftPackage<L>>,
212 ) {
213 Self::answer_append_log(server, header, false, comm_service)
214 }
215
216 pub fn handle_request_vote(
217 server: &mut RaftService<T, L>,
218 header: Header<SystemNodeId>,
219 args: RequestVoteArgs,
220 comm_service: &mut dyn CommunicationService<RaftPackage<L>>,
221 ) {
222 if !server.cluster.contains_key(&header.from) {
223 debug!("Ignoring msg {:?} from {:?}", args, header.from);
224 return;
225 }
226 let candidate_log_out_of_date = Self::candidate_log_is_out_of_date(
227 &args.prev_log_index,
228 &server.log,
229 &args.prev_log_term,
230 );
231 if Self::vote_granted_to_other_candidate(server) || candidate_log_out_of_date {
232 debug!(
233 "Rejecting vote request. {}",
234 if candidate_log_out_of_date {
235 "Log out of date"
236 } else {
237 "Granted to another candidate"
238 }
239 );
240 Self::reject_vote_request(server, header, comm_service);
241 } else {
242 debug!("Granting vote request");
243 server.io.set_follower_timeout();
244 Self::update_term(server, args.term);
245 Self::grant_vote_request(server, header, comm_service);
246 }
247 }
248
249 fn vote_granted_to_other_candidate(server: &mut RaftService<T, L>) -> bool {
250 debug!("vote_granted_to_other_candidate {:?}", &server.cluster);
251 server
252 .cluster
253 .values()
254 .any(|member| member.vote_granted == Granted)
255 }
256
257 fn update_term(server: &mut RaftService<T, L>, term: Term) {
258 if server.term < term {
259 server.term = term;
260 }
261 }
262
263 fn grant_vote_request(
264 server: &mut RaftService<T, L>,
265 header: Header<SystemNodeId>,
266 comm_service: &mut dyn CommunicationService<RaftPackage<L>>,
267 ) {
268 let answer = RequestVoteResponse(RequestVoteResponseResult {
269 term: server.term,
270 granted: true,
271 });
272 if let Some(member) = server.cluster.get_mut(&header.from) {
273 member.vote_granted = Granted
274 }
275 comm_service.push(server.build_message(answer, header.from).unwrap())
276 }
277
278 fn candidate_log_is_out_of_date(
279 prev_log_index: &LogIndex,
280 log: &Log<L>,
281 prev_log_term: &Term,
282 ) -> bool {
283 let follower_last_entry_has_greater_term = log
284 .last()
285 .map(|entry| entry.term > *prev_log_term)
286 .unwrap_or(false);
287
288 let terms_are_equal = log
289 .last()
290 .map(|entry| entry.term == *prev_log_term)
291 .unwrap_or(false);
292
293 let follower_log_is_longer = log.len() > *prev_log_index as usize;
294 log::debug!(
295 "{} {} {}",
296 follower_last_entry_has_greater_term,
297 terms_are_equal,
298 follower_log_is_longer
299 );
300
301 follower_last_entry_has_greater_term || (terms_are_equal && follower_log_is_longer)
302 }
303
304 fn reject_vote_request(
305 server: &mut RaftService<T, L>,
306 header: Header<SystemNodeId>,
307 comm_service: &mut dyn CommunicationService<RaftPackage<L>>,
308 ) {
309 let answer = RequestVoteResponse(RequestVoteResponseResult {
310 term: server.term,
311 granted: false,
312 });
313 comm_service.push(server.build_message(answer, header.from).unwrap())
314 }
315 }
316}
317
318#[allow(non_snake_case)]
319#[cfg(test)]
320mod given_follower {
321 use core::fmt::Debug;
322 use dcs::communication::connection::{InMsgQueue, OutMsgQueue};
323 use dcs::communication::messages::PackageBuilder;
324 use dcs::communication::service::CommunicationService;
325 use dcs::nodes::SystemNodeId;
326
327 use crate::messages::RaftMessage::{
328 AppendLog, AppendLogResponse, RequestVote, RequestVoteResponse,
329 };
330 use crate::messages::RaftMessage::{InstallSnapshot, WriteRequest};
331 use crate::messages::*;
332 use crate::messages::{InstallSnapshotArgs, LogEntry, LogIndex, RaftMessage};
333 use crate::server::follower::test_utils::FollowerContext;
334 use crate::server::test_server_builder::ContextBuilder;
335 use crate::server::test_server_builder::ServerBuilder;
336 use crate::server::test_utils::{fail, FakeTimer, TestState};
337 use crate::server::{package, LogData, RaftPackage, RaftService};
338 use dcs::rules::measurements::Measurement;
339 use dcs::rules::measurements::ClusterType::TEMPERATURE;
340
341 pub fn some_entry() -> Option<TestState> {
342 Some(TestState(1, 1))
343 }
344
345 mod receives_append_entry {
346 use super::*;
347 use crate::server::follower::test_utils::FollowerContext;
348 use crate::server::test_server_builder::ContextBuilder;
349 use crate::server::test_utils::almost_timeout_follower;
350
351 #[test]
352 fn duplicated_entries_are_ignored() {
353 let mut builder = ServerBuilder::new();
354 let (mut in_queue_test, mut out_queue_test, mut server, _cluster, comm_service) =
355 builder.follower();
356
357 send_append_log(&mut out_queue_test, 0, 0, some_entry());
358 send_append_log(&mut out_queue_test, 0, 0, some_entry());
359 server.tick(comm_service);
360 server.tick(comm_service);
361
362 let package = in_queue_test.pop().unwrap();
363 let msg = package.get_message();
364 assert!(append_log_is_successful(msg));
365 let package = in_queue_test.pop().unwrap();
366 let msg = package.get_message();
367 assert!(append_log_is_successful(msg));
368
369 assert_eq!(some_entry(), server.log.get(1).unwrap().data);
370 assert_eq!(None, server.log.get(2));
371 }
372
373 #[test]
374 fn from_leader__then_resets_timer() {
375 let mut builder = ServerBuilder::new();
376 let (mut in_queue_test, mut out_queue_test, mut server, _cluster, comm_service) =
377 builder.follower();
378 almost_timeout_follower(&mut server, comm_service); send_append_log(&mut out_queue_test, 0, 0, some_entry());
381 server.tick(comm_service);
382
383 assert!(follower_didnt_start_election(&mut in_queue_test))
384 }
385
386 #[test]
387 fn if_invalid_then_dont_update_commit_idx() {
388 let mut builder = ContextBuilder::new();
389 let mut context: FollowerContext = builder.follower().into();
390
391 context.send_append_log_and_tick(vec![(None, 1)], 2, 0, 32);
392
393 let server = context.inner.get_node();
394 assert_eq!(server.commit_index, 0)
395 }
396
397 #[test]
398 fn with_multiple_entries_if_valid_then_updates_commit_index_with_last_new_entry_idx() {
399 let mut builder = ContextBuilder::new();
400 let mut context: FollowerContext = builder.follower().into();
401
402 let prev_log_term = 1;
403 let entries = vec![
404 (None, prev_log_term),
405 (None, prev_log_term),
406 (None, prev_log_term),
407 ];
408 context.send_append_log_and_tick(entries, 0, prev_log_term, 32);
409
410 let server = context.inner.get_node();
411 assert_eq!(server.commit_index, 3)
412 }
413
414 #[test]
415 fn with_single_entry_if_valid_then_updates_commit_index_with_last_new_entry_idx() {
416 let mut builder = ContextBuilder::new();
417 let mut context: FollowerContext = builder.follower().into();
418
419 let prev_log_term = 1;
420 context.send_append_log_and_tick(vec![(None, prev_log_term)], 0, prev_log_term, 32);
421
422 let server = context.inner.get_node();
423 assert_eq!(server.commit_index, 1)
424 }
425
426 #[test]
427 fn if_valid_then_updates_commit_index_with_leader_commit_idx() {
428 let mut builder = ContextBuilder::new();
429 let mut context: FollowerContext = builder.follower().into();
430
431 let prev_log_term = 1;
432 context.send_append_log_and_tick(vec![(None, prev_log_term)], 0, prev_log_term, 32);
433 context.send_append_log_and_tick(vec![(None, prev_log_term)], 1, prev_log_term, 32);
434 context.send_append_log_and_tick(vec![(None, prev_log_term)], 2, prev_log_term, 32);
435 context.send_append_log_and_tick(vec![(None, prev_log_term)], 3, prev_log_term, 1);
436
437 let server = context.inner.get_node();
438 assert_eq!(server.commit_index, 1)
439 }
440
441 mod with_empty_log {
442 use super::*;
443
444 #[test]
445 fn and_entry_has_greater_nextIndex__then_request_fails() {
446 let mut builder = ServerBuilder::new();
447 let (mut in_queue_test, mut out_queue_test, mut server, _cluster, comm_service) =
448 builder.follower();
449
450 send_append_log(&mut out_queue_test, 2, 0, some_entry());
451 server.tick(comm_service);
452
453 let package = in_queue_test.pop().unwrap();
454 let msg = package.get_message();
455 assert!(append_log_is_not_successful(msg));
456 }
457
458 #[test]
459 fn and_entry_has_correct_nextIndex_and_term_match__then__request_succeeds() {
460 let mut builder = ServerBuilder::new();
461 let (mut in_queue_test, mut out_queue_test, mut server, _cluster, comm_service) =
462 builder.follower();
463
464 send_append_log(&mut out_queue_test, 0, 0, some_entry());
465 server.tick(comm_service);
466
467 let package = in_queue_test.pop().unwrap();
468 let msg = package.get_message();
469 assert!(append_log_is_successful(msg));
470 }
471 }
472
473 mod with_nonempty_log {
474 use super::*;
475
476 #[test]
477 fn when_receives_append_entry_with_correct_nextIndex_but_term_doesnt_match__then_request_fails(
478 ) {
479 let mut builder = ServerBuilder::new();
480 let (mut in_queue_test, mut out_queue_test, mut server, _cluster, comm_service) =
481 builder.follower();
482 add_log_entry_to_server(
483 &mut server,
484 &mut out_queue_test,
485 &mut in_queue_test,
486 comm_service,
487 some_entry(),
488 );
489
490 send_append_log(&mut out_queue_test, 1, 99, some_entry());
491 server.tick(comm_service);
492
493 if let AppendLogResponse(AppendLogResponseResult { success, .. }) =
494 in_queue_test.pop().unwrap().get_message()
495 {
496 assert!(!success)
497 } else {
498 fail()
499 }
500 }
501
502 #[test]
503 fn when_receives_append_entry_with_smaller_nextIndex__then_request_succeeds_and_log_is_updated(
504 ) {
505 let mut builder = ServerBuilder::new();
506 let (mut in_queue_test, mut out_queue_test, mut server, _cluster, comm_service) =
507 builder.follower();
508
509 add_log_entries_to_server(
510 &mut server,
511 &mut out_queue_test,
512 &mut in_queue_test,
513 comm_service,
514 vec![some_entry(), some_other_entry()],
515 );
516
517 send_append_log(&mut out_queue_test, 1, 1, some_entry());
518 server.tick(comm_service);
519
520 assert!(append_log_is_successful(
521 in_queue_test.pop().unwrap().get_message()
522 ));
523 assert_eq!(some_entry(), server.log.get(1).unwrap().data);
524 assert_eq!(some_entry(), server.log.get(2).unwrap().data);
525 }
526
527 #[test]
528 fn when_receives_append_entry_with_correct_nextIndex_and_term__then_request_succeeds() {
529 let mut builder = ServerBuilder::new();
530 let (mut in_queue_test, mut out_queue_test, mut server, _cluster, comm_service) =
531 builder.follower();
532 add_log_entry_to_server(
533 &mut server,
534 &mut out_queue_test,
535 &mut in_queue_test,
536 comm_service,
537 some_entry(),
538 );
539
540 send_append_log(&mut out_queue_test, 1, 1, some_entry());
541 server.tick(comm_service);
542
543 let package = in_queue_test.pop().unwrap();
544 let msg = package.get_message();
545 assert!(append_log_is_successful(msg));
546 }
547 }
548
549 mod receives_new_config {
550 use super::*;
551
552 #[test]
553 fn applies_it_immediately() {
554 let mut builder = ContextBuilder::new();
555 let mut context = builder.follower();
556
557 let mut new_config = context.current_config();
558 new_config.remove(2);
559 let entry =
560 LogEntry::<TestState>::with_config(context.term(), Some(new_config.clone()));
561 let mut log = Log::new();
562 log.push(entry);
563 let args = AppendLogArgs {
564 term: 0,
565 prev_log_index: 0,
566 prev_log_term: 0,
567 entries: log,
568 leader_commit: 0,
569 };
570 context.empty_queue();
571 context.send_and_tick(context.package(AppendLog(args)));
572 assert_eq!(
573 AppendLogResponse(AppendLogResponseResult {
574 term: 0,
575 success: true
576 }),
577 context.recv_message().unwrap()
578 );
579
580 assert_eq!(2, context.cluster_size());
581 }
582 }
583 }
584
585 mod receives_vote_request {
586 use super::*;
587 use crate::server::test_utils::almost_timeout_follower;
588
589 #[test]
590 fn at_startup_when_granting_vote_request_then_resets_timer() {
591 let mut builder = ServerBuilder::new();
592 let (mut in_queue_test, mut out_queue_test, mut server, _cluster, comm_service) =
593 builder.server_in_cluster();
594 almost_timeout_follower(&mut server, comm_service);
595
596 send_request_vote(&mut out_queue_test, 0, 0, 1, 1);
597 server.tick(comm_service);
598
599 assert!(follower_didnt_start_election(&mut in_queue_test))
600 }
601
602 #[test]
603 fn from_candidate_with_smaller_last_term_in_log__then_request_is_rejected() {
604 let mut builder = ServerBuilder::new();
605 let (mut in_queue_test, mut out_queue_test, mut server, _cluster, comm_service) =
606 builder.follower();
607 add_log_entry_to_server(
608 &mut server,
609 &mut out_queue_test,
610 &mut in_queue_test,
611 comm_service,
612 some_entry(),
613 );
614
615 send_request_vote(&mut out_queue_test, 0, 0, 0, 0);
616 server.tick(comm_service);
617
618 let package = in_queue_test.pop().unwrap();
619 let msg = package.get_message();
620 assert!(request_vote_not_granted(msg));
621 }
622
623 #[test]
624 fn from_candidate_with_same_last_term_and_shorter_log__then_request_is_rejected() {
625 let mut builder = ServerBuilder::new();
626 let (mut in_queue_test, mut out_queue_test, mut server, _cluster, comm_service) =
627 builder.follower();
628 add_log_entry_to_server(
629 &mut server,
630 &mut out_queue_test,
631 &mut in_queue_test,
632 comm_service,
633 some_entry(),
634 );
635 add_log_entry_to_server(
636 &mut server,
637 &mut out_queue_test,
638 &mut in_queue_test,
639 comm_service,
640 some_entry(),
641 );
642
643 send_request_vote(&mut out_queue_test, 0, 0, 0, 0);
644 server.tick(comm_service);
645
646 let package = in_queue_test.pop().unwrap();
647 let msg = package.get_message();
648 assert!(request_vote_not_granted(msg));
649 }
650
651 mod with_nonempty_log {
652 use super::*;
653
654 #[test]
655 fn candidate_has_valid_log__and_voted_granted_to_no_other__then_vote_is_granted() {
656 let mut builder = ServerBuilder::new();
657 let (mut in_queue_test, mut out_queue_test, mut server, _cluster, comm_service) =
658 builder.follower();
659 add_log_entry_to_server(
660 &mut server,
661 &mut out_queue_test,
662 &mut in_queue_test,
663 comm_service,
664 some_entry(),
665 );
666 add_log_entry_to_server(
667 &mut server,
668 &mut out_queue_test,
669 &mut in_queue_test,
670 comm_service,
671 some_entry(),
672 );
673
674 send_request_vote(&mut out_queue_test, 2, 1, 1, 0);
675 server.tick(comm_service);
676
677 let package = in_queue_test.pop().unwrap();
678 let msg = package.get_message();
679 assert!(request_vote_granted(msg));
680 }
681
682 #[test]
683 fn candidate_has_valid_log__and_voted_granted_to_other__then_vote_is_denied() {
684 let mut builder = ServerBuilder::new();
685 let (mut in_queue_test, mut out_queue_test, mut server, _cluster, comm_service) =
686 builder.follower();
687 add_log_entry_to_server(
688 &mut server,
689 &mut out_queue_test,
690 &mut in_queue_test,
691 comm_service,
692 some_entry(),
693 );
694 add_log_entry_to_server(
695 &mut server,
696 &mut out_queue_test,
697 &mut in_queue_test,
698 comm_service,
699 some_entry(),
700 );
701
702 send_request_vote(&mut out_queue_test, 2, 0, 0, 0);
703 server.tick(comm_service);
704 in_queue_test.pop().unwrap();
705
706 send_request_vote(&mut out_queue_test, 2, 0, 0, 2);
707 server.tick(comm_service);
708
709 let package = in_queue_test.pop().unwrap();
710 let msg = package.get_message();
711 assert!(request_vote_not_granted(msg));
712 }
713 }
714 }
715
716 mod receives_client_interaction {
717 use crate::messages::RaftMessage::{ReadRequestReply, WriteRequestReply};
718 use crate::messages::{ReadRequestReplyArgs};
719 use crate::server::test_server_builder::ServerBuilder;
720 use crate::server::test_utils::{send_read_request, send_write_request};
721 use dcs::communication::connection::InMsgQueue;
722 use dcs::nodes::SystemNodeId;
723 use dcs::rules::measurements::Measurement;
724 use dcs::rules::measurements::ClusterType::TEMPERATURE;
725
726 #[test]
727 fn receives_write_request__then_redirects_to_leader() {
728 let leader_id = SystemNodeId::from(0);
729 let mut builder = ServerBuilder::new();
730 let (mut in_queue_test, mut out_queue_test, mut server, _cluster, comm_service) =
731 builder.follower();
732
733 send_write_request(
734 &mut server,
735 &mut out_queue_test,
736 Measurement::new(TEMPERATURE, 0),
737 comm_service,
738 );
739 let answer = in_queue_test.pop().unwrap();
740 if WriteRequestReply(false, Some(leader_id)) != answer.body {
741 unreachable!("failed")
742 }
743 }
744
745 #[test]
746 fn receives_read_request__then_redirects_to_leader() {
747 let leader_id = SystemNodeId::from(0);
748 let mut builder = ServerBuilder::new();
749 let (mut in_queue_test, mut out_queue_test, mut server, _cluster, comm_service) =
750 builder.follower();
751
752 send_read_request(&mut server, &mut out_queue_test, comm_service);
753 let answer = in_queue_test.pop().unwrap();
754 if ReadRequestReply(ReadRequestReplyArgs {
755 success: false,
756 data: None,
757 redirect: Some(leader_id),
758 }) != answer.body
759 {
760 unreachable!("failed")
761 }
762 }
763 }
764
765 mod snapshots {
766 use super::*;
767
768 #[test]
769 pub fn snapshot_occurs_at_75_percent_capacity() {
770 let mut builder = ContextBuilder::new();
771 let mut context: FollowerContext = builder.follower().into();
772
773 let (pre_snapshot_capacity, post_snapshot_capacity) =
774 context.trigger_snapshot_at_75_percent_occupation();
775
776 assert!(pre_snapshot_capacity < post_snapshot_capacity);
777 }
778
779 #[test]
780 pub fn can_receive_append_log_after_snapshot() {
781 let mut builder = ContextBuilder::new();
782 let mut context: FollowerContext = builder.follower().into();
783 context.trigger_snapshot_at_75_percent_occupation();
784 context.inner.empty_queue();
785
786 context.send_append_log_and_tick(vec![(Some(TestState(3, 4)), 0)], 29, 1, u32::MAX);
787
788 let msg = context.inner.recv_message().unwrap();
789 assert!(append_log_is_successful(&msg));
790 let log = context.inner.get_node().log.clone();
791 assert_eq!(TestState(3, 4), log.last().unwrap().data.unwrap());
792 }
793 }
794
795 pub mod given_install_snapshot {
796 use dcs::nodes::SystemNodeId;
797 use super::*;
798
799 #[test]
800 pub fn updates_the_log() {
801 let mut builder = ContextBuilder::new();
802 let mut context: FollowerContext = builder.follower().into();
803
804 let snapshot_entry = context.install_snapshot().data;
805
806 assert_eq!(snapshot_entry, context.last_log_entry())
807 }
808
809 #[test]
810 pub fn updates_leader_id() {
811 let mut builder = ContextBuilder::new();
812 let mut context: FollowerContext = builder.follower().into();
813 context.install_snapshot();
814
815 context.inner.empty_queue();
816 context.inner.send_and_tick(
817 context
818 .inner
819 .package(WriteRequest(WriteRequestArgs::with_measurement(4.into(), Measurement::new(TEMPERATURE, 1)))),
820 );
821
822 let msg = context.inner.recv_message().unwrap();
823
824 let id = Some(SystemNodeId::from(1));
825 assert!(
826 matches!(msg, RaftMessage::WriteRequestReply(false, id)),
827 "Did not expect message {msg:?}"
828 );
829 }
830
831 #[test]
832 pub fn replaces_up_to_last_included_entry_with_snapshot() {
833 let mut builder = ContextBuilder::new();
834 let mut context: FollowerContext = builder.follower().into();
835 let prev_log_term = 1;
836 context.send_append_log_and_tick(
837 vec![(Some(TestState(1, 1)), prev_log_term)],
838 0,
839 prev_log_term,
840 32,
841 );
842 context.send_append_log_and_tick(
843 vec![(Some(TestState(2, 2)), prev_log_term)],
844 1,
845 prev_log_term,
846 32,
847 );
848 context.send_append_log_and_tick(
849 vec![(Some(TestState(3, 3)), prev_log_term)],
850 2,
851 prev_log_term,
852 32,
853 );
854 context.send_append_log_and_tick(
855 vec![(Some(TestState(4, 4)), prev_log_term)],
856 3,
857 prev_log_term,
858 32,
859 );
860
861 let log_before_snapshot = context.log();
862
863 let install_snapshot = context.install_snapshot();
864
865 let log_after_snapshot = context.log();
866
867 let len_after_entries_removed =
868 log_before_snapshot.len() - install_snapshot.last_included_index as usize;
869 let snapshot_entries_added = 1;
870 assert_eq!(
871 len_after_entries_removed + snapshot_entries_added,
872 log_after_snapshot.len()
873 )
874 }
875
876 #[test]
877 pub fn if_term_is_less_than_current_term_responds_immediately_and_dont_update() {
878 let mut builder = ContextBuilder::new();
879 let mut context: FollowerContext = builder.follower().into();
880 let prev_log_term = 1;
881 context.send_append_log_and_tick(
882 vec![(Some(TestState(1, 1)), prev_log_term)],
883 0,
884 prev_log_term,
885 32,
886 );
887 context.send_append_log_and_tick(
888 vec![(Some(TestState(2, 2)), prev_log_term)],
889 1,
890 prev_log_term,
891 32,
892 );
893 context.send_append_log_and_tick(
894 vec![(Some(TestState(3, 3)), prev_log_term)],
895 2,
896 prev_log_term,
897 32,
898 );
899 context.send_append_log_and_tick(
900 vec![(Some(TestState(4, 4)), prev_log_term)],
901 3,
902 prev_log_term,
903 32,
904 );
905 context.inner.empty_queue();
906 let log_before = context.log();
907
908 context.set_term(15);
909 let install_snapshot = context.install_snapshot();
910
911 let log_after = context.log();
912 let message = context.inner.recv_message().unwrap();
913 assert!(matches!(message, RaftMessage::InstallSnapshotResponse(15)));
914 assert_eq!(log_before, log_after);
915 }
916
917 #[test]
918 pub fn responds_with_current_term() {
919 let mut builder = ContextBuilder::new();
920 let mut context: FollowerContext = builder.follower().into();
921 let prev_log_term = 1;
922 context.send_append_log_and_tick(
923 vec![(Some(TestState(1, 1)), prev_log_term)],
924 0,
925 prev_log_term,
926 32,
927 );
928 context.send_append_log_and_tick(
929 vec![(Some(TestState(2, 2)), prev_log_term)],
930 1,
931 prev_log_term,
932 32,
933 );
934 context.send_append_log_and_tick(
935 vec![(Some(TestState(3, 3)), prev_log_term)],
936 2,
937 prev_log_term,
938 32,
939 );
940 context.send_append_log_and_tick(
941 vec![(Some(TestState(4, 4)), prev_log_term)],
942 3,
943 prev_log_term,
944 32,
945 );
946 context.inner.empty_queue();
947
948 context.set_term(5);
949 let install_snapshot = context.install_snapshot();
950
951 let message = context.inner.recv_message().unwrap();
952 assert!(
953 matches!(message, RaftMessage::InstallSnapshotResponse(5)),
954 "Wasn't expecting {message:?}"
955 );
956 }
957 }
958
959 pub fn some_other_entry() -> Option<TestState> {
960 Some(TestState(37, 12))
961 }
962
963 fn append_log_is_successful(msg: &RaftMessage<TestState>) -> bool {
964 matches!(msg, AppendLogResponse(AppendLogResponseResult { success, .. }) if *success)
965 }
966
967 fn append_log_is_not_successful(msg: &RaftMessage<TestState>) -> bool {
968 matches!(msg, AppendLogResponse(AppendLogResponseResult { success, .. }) if !success)
969 }
970
971 fn send_append_log<T: LogData + Debug>(
972 out_queue_test: &mut dyn OutMsgQueue<RaftPackage<T>>,
973 prev_log_index: LogIndex,
974 prev_log_term: Term,
975 data: Option<T>,
976 ) {
977 let mut entries: Log<T> = Default::default();
978 let _ = entries.push(LogEntry::with_data(prev_log_term, data));
979
980 let append_log = AppendLog(AppendLogArgs {
981 term: 1,
982 prev_log_index,
983 prev_log_term,
984 entries,
985 leader_commit: 0,
986 });
987
988 let append_log_req = package(append_log)
989 .from(0.into())
990 .to(1.into())
991 .build()
992 .unwrap();
993
994 out_queue_test.push(append_log_req);
995 }
996
997 fn add_log_entries_to_server<L: LogData + Debug>(
998 server: &mut RaftService<FakeTimer, L>,
999 out_queue_test: &mut dyn OutMsgQueue<RaftPackage<L>>,
1000 in_queue_test: &mut dyn InMsgQueue<RaftPackage<L>>,
1001 comm_service: &mut dyn CommunicationService<RaftPackage<L>>,
1002 entry: Vec<Option<L>>,
1003 ) {
1004 for (idx, e) in entry.into_iter().enumerate() {
1005 send_append_log(out_queue_test, idx as LogIndex, 1, e);
1006 server.tick(comm_service);
1007 in_queue_test.pop().unwrap();
1008 }
1009 }
1010
1011 fn add_log_entry_to_server<L: LogData + Debug>(
1012 server: &mut RaftService<FakeTimer, L>,
1013 out_queue_test: &mut dyn OutMsgQueue<RaftPackage<L>>,
1014 in_queue_test: &mut dyn InMsgQueue<RaftPackage<L>>,
1015 comm_service: &mut dyn CommunicationService<RaftPackage<L>>,
1016 entry: Option<L>,
1017 ) {
1018 send_append_log(out_queue_test, 0, 1, entry);
1019 server.tick(comm_service);
1020 in_queue_test.pop().unwrap();
1021 }
1022
1023 fn request_vote_granted(msg: &RaftMessage<TestState>) -> bool {
1024 matches!(msg, RequestVoteResponse(RequestVoteResponseResult { granted, .. }) if *granted)
1025 }
1026
1027 fn request_vote_not_granted(msg: &RaftMessage<TestState>) -> bool {
1028 matches!(msg, RequestVoteResponse(RequestVoteResponseResult { granted, .. }) if !granted)
1029 }
1030
1031 fn send_request_vote<T: LogData + Debug>(
1032 out_queue_test: &mut dyn OutMsgQueue<RaftPackage<T>>,
1033 prev_log_index: LogIndex,
1034 prev_log_term: Term,
1035 term: Term,
1036 from_id: u32,
1037 ) {
1038 let request_vote = RequestVote(RequestVoteArgs {
1039 term,
1040 prev_log_index,
1041 prev_log_term,
1042 });
1043 let append_log_req = package(request_vote)
1044 .from(SystemNodeId::from(from_id))
1045 .to(1.into())
1046 .build()
1047 .unwrap();
1048 out_queue_test.push(append_log_req);
1049 }
1050
1051 pub fn follower_didnt_start_election(
1052 comm_service: &mut dyn InMsgQueue<RaftPackage<TestState>>,
1053 ) -> bool {
1054 let append_log_response =
1055 RaftMessage::<TestState>::AppendLogResponse(AppendLogResponseResult {
1056 term: 0,
1057 success: true,
1058 });
1059 let actual_append_log_response = comm_service.pop().unwrap().body;
1060 matches!(append_log_response, actual_append_log_response) && comm_service.pop().is_none()
1061 }
1062}
1063
1064#[cfg(test)]
1065mod test_utils {
1066 use dcs::nodes::SystemNodeId;
1067 use crate::messages::RaftMessage::{AppendLog, InstallSnapshot};
1068 use crate::messages::{
1069 AppendLogArgs, InstallSnapshotArgs, Log, LogEntry, LogIndex, Term, LOG_LEN,
1070 };
1071 use crate::server::leader::test_utils::LeaderContext;
1072 use crate::server::test_server_builder::TestContext;
1073 use crate::server::test_utils::TestState;
1074 use dcs::rules::measurements::Measurement;
1075 use dcs::rules::measurements::ClusterType::TEMPERATURE;
1076
1077 pub struct FollowerContext<'a> {
1078 pub inner: TestContext<'a>,
1079 }
1080
1081 impl<'a> FollowerContext<'a> {
1082 pub fn set_term(&mut self, term: Term) {
1083 self.inner.get_node_mut().term = term;
1084 }
1085
1086 pub(crate) fn trigger_snapshot_at_75_percent_occupation(&mut self) -> (f32, f32) {
1087 let count_to_fill_log = LOG_LEN as f32 * 0.74;
1088 for i in 0..count_to_fill_log.floor() as u32 {
1089 self.send_append_log_and_tick(vec![(Some(TestState(2, 2)), 1)], i, 1, u32::MAX);
1090 }
1091 let pre_snapshot_capacity = self.inner.get_node().log.capacity();
1092 let count_to_snapshot = count_to_fill_log * 1.1;
1093 for i in count_to_fill_log as u32..count_to_snapshot.ceil() as u32 {
1094 self.send_append_log_and_tick(vec![(Some(TestState(2, 2)), 1)], i, 1, u32::MAX);
1095 }
1096 let post_snapshot_capacity = self.inner.get_node().log.capacity();
1097 (pre_snapshot_capacity, post_snapshot_capacity)
1098 }
1099
1100 pub fn send_append_log_and_tick(
1101 &mut self,
1102 data: Vec<(Option<TestState>, Term)>,
1103 prev_log_index: LogIndex,
1104 prev_log_term: Term,
1105 leader_commit: LogIndex,
1106 ) {
1107 let mut entries: Log<TestState> = Default::default();
1108 for datum in data {
1109 let _ = entries.push(LogEntry::with_data(datum.1, datum.0));
1110 }
1111
1112 let append_log = AppendLog(AppendLogArgs {
1113 term: 1,
1114 prev_log_index,
1115 prev_log_term,
1116 entries: entries.clone(),
1117 leader_commit,
1118 });
1119
1120 self.inner.send_and_tick(self.inner.package(append_log));
1121 }
1122
1123 pub fn install_snapshot(&mut self) -> InstallSnapshotArgs<TestState> {
1124 let snapshot_entry = LogEntry::new(7, Some(TestState(5, 7)), None, None);
1125
1126 let args = InstallSnapshotArgs {
1127 term: snapshot_entry.term,
1128 leader_id: SystemNodeId::from(1),
1129 last_included_index: 3,
1130 last_included_term: 1,
1131 data: snapshot_entry,
1132 };
1133 let install_snapshot = InstallSnapshot(args.clone());
1134 self.inner
1135 .send_and_tick(self.inner.package(install_snapshot));
1136 args
1137 }
1138
1139 pub fn last_log_entry(&self) -> LogEntry<TestState> {
1140 self.inner.get_node().log.clone().last().unwrap().clone()
1141 }
1142
1143 pub fn log(&self) -> Log<TestState> {
1144 self.inner.get_node().log.clone()
1145 }
1146 }
1147
1148 impl<'a> From<TestContext<'a>> for FollowerContext<'a> {
1149 fn from(context: TestContext<'a>) -> Self {
1150 Self { inner: context }
1151 }
1152 }
1153}
1154
1155#[macro_export]
1156macro_rules! initialize_follower {
1157 ($in_queue_test:ident, $out_queue_test:ident, $server:ident ) => {
1158 let mut timer: FakeTimer = FakeTimer::new();
1159 timer.reset_election_timer();
1160 timer.reset_election_timer();
1161 let (mut $in_queue_test, mut out_queue_server) = FakeMsgQueue::new();
1162 let (mut in_queue_server, mut $out_queue_test) = FakeMsgQueue::new();
1163 let mut cluster: LinearMap<ClusterMemberId, ClusterMember, CLUSTER_NODE_COUNT> =
1164 LinearMap::new();
1165 let _ = cluster.insert(
1166 0,
1167 ClusterMember {
1168 id: 0,
1169 vote_granted: Abstained,
1170 match_idx: 0,
1171 next_idx: 0,
1172 },
1173 );
1174 let _ = cluster.insert(
1175 2,
1176 ClusterMember {
1177 id: 2,
1178 vote_granted: Abstained,
1179 match_idx: 0,
1180 next_idx: 0,
1181 },
1182 );
1183 let mut $server = Server::new(
1184 &mut timer,
1185 &mut out_queue_server,
1186 &mut in_queue_server,
1187 cluster,
1188 );
1189 $server.id = 1;
1190 $server.leader_id = 0;
1191 };
1192}