1#![deny(missing_docs)]
2
3use crate::disseminated::Disseminated;
4use crate::incoming_message::{DisseminationMessageIn, IncomingMessage, PingRequestMessageIn};
5use crate::member::{Member, MemberId};
6use crate::message::MessageType;
7use crate::message_decoder::decode_message;
8use crate::message_encoder::{DisseminationMessageEncoder, OutgoingMessage, PingRequestMessageEncoder};
9use crate::notification::Notification;
10use crate::result::Result;
11use crate::suspicion::Suspicion;
12use crate::ProtocolConfig;
13use failure::{format_err, ResultExt};
14use mio::net::UdpSocket;
15use mio::{Event, Events, Poll, PollOpt, Ready, Token};
16use mio_extras::channel::{Receiver, Sender};
17use rand::rngs::SmallRng;
18use rand::seq::SliceRandom;
19use rand::SeedableRng;
20use slog::{debug, info, warn};
21use std::collections::{HashMap, VecDeque};
22use std::fmt;
23use std::net::SocketAddr;
24use std::time::Duration;
25
26struct IncomingLetter {
27 sender: SocketAddr,
28 message: IncomingMessage,
29}
30
31impl fmt::Debug for IncomingLetter {
32 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
33 write!(
34 f,
35 "IncomingLetter {{ sender: {:#?}, message: {:#?} }}",
36 self.sender, self.message
37 )
38 }
39}
40
41#[derive(Debug)]
42struct Header {
43 member_id: MemberId,
44 sequence_number: u64,
45}
46
47#[derive(Debug)]
48struct Ack {
49 request: Request,
50 request_time: std::time::Instant,
51}
52
53impl Ack {
54 fn new(request: Request) -> Self {
55 Ack {
56 request,
57 request_time: std::time::Instant::now(),
58 }
59 }
60}
61
62#[derive(Debug)]
63struct PingProxyRequest {
64 sender: Member,
65 target: Member,
66 sequence_number: u64,
67}
68
69#[derive(Debug)]
70struct AckIndirectRequest {
71 target: Member,
72 sequence_number: u64,
73}
74
75#[derive(Debug)]
76enum Request {
77 Init(SocketAddr),
78 Ping(Header),
79 PingIndirect(Header),
80 PingProxy(PingProxyRequest),
81 Ack(Header),
82 AckIndirect(AckIndirectRequest),
83}
84
85#[derive(Debug)]
86pub(crate) enum ChannelMessage {
87 Stop,
88 GetMembers(std::sync::mpsc::Sender<Vec<SocketAddr>>),
89}
90
91struct Timeout<F: FnOnce(&mut SyncNode)> {
93 when: std::time::Instant,
94 what: F,
95}
96
97pub(crate) struct SyncNode {
99 config: ProtocolConfig,
100 udp: Option<UdpSocket>,
101 ping_order: Vec<MemberId>,
102 broadcast: Disseminated<MemberId>,
103 notifications: Disseminated<Notification>,
104 members: HashMap<MemberId, Member>,
105 next_member_index: usize,
106 epoch: u64,
107 sequence_number: u64,
108 recv_buffer: Vec<u8>,
109 myself: Member,
110 requests: VecDeque<Request>,
111 receiver: Receiver<ChannelMessage>,
112 acks: Vec<Ack>,
113 rng: SmallRng,
114 suspicions: VecDeque<Suspicion>,
115 timeouts: Vec<Timeout<Box<dyn FnOnce(&mut SyncNode) + Send>>>,
116 logger: slog::Logger,
117}
118
119impl SyncNode {
120 pub(crate) fn new(bind_address: SocketAddr, config: ProtocolConfig) -> (SyncNode, Sender<ChannelMessage>) {
121 let (sender, receiver) = mio_extras::channel::channel();
122 let gossip = SyncNode {
123 config,
124 udp: None,
125 ping_order: vec![],
126 broadcast: Disseminated::new(),
127 notifications: Disseminated::with_limit(20),
128 members: HashMap::new(),
129 next_member_index: 0,
130 epoch: 0,
131 sequence_number: 0,
132 recv_buffer: vec![0u8; 1500],
133 myself: Member::new(bind_address),
134 requests: VecDeque::<Request>::with_capacity(32),
135 receiver,
136 acks: Vec::<Ack>::with_capacity(32),
137 rng: SmallRng::from_entropy(),
138 suspicions: VecDeque::new(),
139 timeouts: Vec::new(),
140 logger: slog::Logger::root(slog::Discard, slog::o!()),
141 };
142 (gossip, sender)
143 }
144
145 pub(crate) fn set_logger(&mut self, logger: slog::Logger) {
146 self.logger = logger.new(slog::o!("id" => self.myself.id.to_string()));
147 }
148
149 pub(crate) fn start(&mut self) -> Result<()> {
150 let poll = Poll::new().unwrap();
151 poll.register(&self.receiver, Token(1), Ready::readable(), PollOpt::empty())?;
152 self.bind(&poll)?;
153
154 let mut events = Events::with_capacity(1024);
155 let mut last_epoch_time = std::time::Instant::now();
156
157 'mainloop: loop {
158 poll.poll(&mut events, Some(Duration::from_millis(100))).unwrap();
159 for event in events.iter() {
160 match event.token() {
161 Token(0) => {
162 if let Err(e) = self.handle_protocol_event(&event) {
163 slog::warn!(self.logger, "Failed to process protocol event: {:?}", e);
164 }
165 }
166 Token(1) => match self.receiver.try_recv() {
167 Ok(message) => {
168 debug!(self.logger, "ChannelMessage::{:?}", message);
169 match message {
170 ChannelMessage::Stop => {
171 break 'mainloop;
172 }
173 ChannelMessage::GetMembers(sender) => {
174 let members = std::iter::once(&self.myself.address)
175 .chain(self.members.values().map(|m| &m.address))
176 .cloned()
177 .collect::<Vec<_>>();
178 if let Err(e) = sender.send(members) {
179 warn!(self.logger, "Failed to send list of members: {:?}", e);
180 }
181 }
182 }
183 }
184 Err(e) => {
185 debug!(self.logger, "Not ready yet: {:?}", e);
186 }
187 },
188 _ => unreachable!(),
189 }
190 }
191
192 self.handle_acks()?;
193
194 self.drain_timeout_suspicions()
195 .into_iter()
196 .for_each(|s| self.handle_timeout_suspicion(&s));
197
198 let now = std::time::Instant::now();
199 if now > (last_epoch_time + Duration::from_secs(self.config.protocol_period)) {
200 debug!(self.logger, "Notifications: {:?}", self.notifications);
202 debug!(self.logger, "Broadcast: {:?}", self.broadcast);
203
204 self.advance_epoch();
205 last_epoch_time = now;
206 }
207
208 self.handle_timeouts();
209 }
210
211 Ok(())
212 }
213
214 fn handle_acks(&mut self) -> Result<()> {
215 let now = std::time::Instant::now();
216 let ack_timeout = Duration::from_secs(self.config.ack_timeout as u64);
217 let (handle, postpone): (Vec<_>, Vec<_>) = self
218 .acks
219 .drain(..)
220 .partition(|ack| ack.request_time + ack_timeout <= now);
221 handle.into_iter().try_for_each(|ack| self.handle_timeout_ack(ack))?;
222 self.acks = postpone;
223 Ok(())
224 }
225
226 fn handle_timeouts(&mut self) {
227 let now = std::time::Instant::now();
228 let (handle, postpone): (Vec<_>, Vec<_>) = self.timeouts.drain(..).partition(|t| t.when <= now);
229 handle.into_iter().for_each(|t| (t.what)(self));
230 self.timeouts = postpone;
231 }
232
233 fn drain_timeout_suspicions(&mut self) -> Vec<Suspicion> {
234 let mut suspicions = Vec::new();
235 loop {
236 match self.suspicions.front() {
237 Some(ref suspicion) => {
238 if std::time::Instant::now()
239 > (suspicion.created + Duration::from_secs(self.config.suspect_timeout as u64))
240 {
241 suspicions.push(self.suspicions.pop_front().unwrap());
242 } else {
243 break;
244 }
245 }
246 None => break,
247 }
248 }
249 suspicions
250 }
251
252 pub(crate) fn join(&mut self, member: SocketAddr) -> Result<()> {
253 assert_ne!(member, self.myself.address, "Can't join yourself");
254 self.requests.push_front(Request::Init(member));
255 self.start()
256 }
257
258 fn handle_timeout_suspicion(&mut self, suspicion: &Suspicion) {
259 if !self.members.contains_key(&suspicion.member_id) {
261 debug!(self.logger, "Member {} already removed.", suspicion.member_id);
262 return;
263 }
264
265 let confirm = Notification::Confirm {
266 member: self.members[&suspicion.member_id].clone(),
267 };
268 self.notifications.remove(&Notification::Suspect {
269 member: self.members[&suspicion.member_id].clone(),
270 });
271 self.notifications.add(confirm);
272 self.remove_member(&suspicion.member_id)
273 }
274
275 fn advance_epoch(&mut self) {
276 if let Some(member_id) = self.get_next_member() {
277 let ping = Request::Ping(Header {
278 member_id,
279 sequence_number: self.get_next_sequence_number(),
280 });
281 self.requests.push_front(ping);
282 }
283 self.epoch += 1;
284 info!(self.logger, "New epoch: {}", self.epoch);
285 }
286
287 fn handle_timeout_ack(&mut self, ack: Ack) -> Result<()> {
288 match ack.request {
289 Request::Init(address) => {
290 info!(self.logger, "Failed to join {}", address);
291 self.timeouts.push(Timeout {
292 when: std::time::Instant::now() + std::time::Duration::from_secs(self.config.join_retry_timeout),
293 what: Box::new(|myself| myself.requests.push_front(ack.request)),
294 });
295 }
296 Request::Ping(header) => {
297 self.requests.push_back(Request::PingIndirect(header));
298 }
299 Request::PingIndirect(header) => {
300 self.handle_suspect(header.member_id);
301 }
302 Request::PingProxy(request) => {
303 warn!(
304 self.logger,
305 "Ping proxy from {} to {} timed out", request.sender.id, request.target.id
306 );
307 }
308 _ => unreachable!(),
309 }
310 Ok(())
311 }
312
313 fn bind(&mut self, poll: &Poll) -> Result<()> {
314 self.udp = Some(UdpSocket::bind(&self.myself.address).context("Failed to bind UDP socket")?);
315 poll.register(
317 self.udp.as_ref().unwrap(),
318 Token(0),
319 Ready::readable() | Ready::writable(),
320 PollOpt::level(),
321 )
322 .map_err(|e| format_err!("Failed to register UDP socket for polling: {:?}", e))
323 }
324
325 fn send_message(&mut self, target: SocketAddr, message: OutgoingMessage) {
326 debug!(self.logger, "{:?} <- {:?}", target, message);
327 if target == self.myself.address {
332 debug!(self.logger, "Trying to send a message to myself, dropping it");
333 return;
334 }
335 match self.udp.as_ref().unwrap().send_to(message.buffer(), &target) {
336 Err(e) => warn!(self.logger, "Message to {:?} was not delivered due to {:?}", target, e),
337 Ok(count) => {
338 debug!(self.logger, "Send {} bytes", count);
339 if let OutgoingMessage::DisseminationMessage(ref dissemination_message) = message {
340 self.notifications.mark(dissemination_message.num_notifications());
341 self.broadcast.mark(dissemination_message.num_broadcast());
342 }
343 }
344 }
345 }
346
347 fn recv_letter(&mut self) -> Option<IncomingLetter> {
348 match self.udp.as_ref().unwrap().recv_from(&mut self.recv_buffer) {
349 Ok((count, sender)) => {
350 debug!(self.logger, "Received {} bytes from {:?}", count, sender);
351 let message = match decode_message(&self.recv_buffer[..count]) {
352 Ok(message) => message,
353 Err(e) => {
354 warn!(self.logger, "Failed to decode from message {:#?}: {}", sender, e);
355 return None;
356 }
357 };
358 let letter = IncomingLetter { sender, message };
359 debug!(self.logger, "{:?}", letter);
360 Some(letter)
361 }
362 Err(e) => {
363 warn!(self.logger, "Failed to receive letter due to {:?}", e);
364 None
365 }
366 }
367 }
368
369 fn update_members<'m>(&mut self, members: impl Iterator<Item = &'m Member>) {
370 for member in members {
371 self.update_member(member);
372 }
373 }
374
375 fn update_member(&mut self, member: &Member) {
376 let confirmed = self.notifications.iter().find(|n| match n {
378 Notification::Confirm {
379 member: confirmed_member,
380 } if confirmed_member.id == member.id => true,
381 _ => false,
382 });
383 if let Some(member) = confirmed {
384 info!(self.logger, "Member {:?} has already been marked as dead", member);
385 return;
386 }
387 if member.id != self.myself.id && self.members.insert(member.id, member.clone()).is_none() {
388 info!(self.logger, "Member joined: {:?}", member);
389 self.ping_order.push(member.id);
390 self.broadcast.add(member.id);
391 }
392 }
393
394 fn update_notifications<'m>(&mut self, notifications: impl Iterator<Item = &'m Notification>) {
395 for notification in notifications {
398 if self.notifications.iter().find(|&n| n >= notification).is_some() {
399 continue;
400 }
401 match notification {
402 Notification::Confirm { member } => self.remove_member(&member.id),
403 Notification::Alive { member } => self.handle_alive(member),
404 Notification::Suspect { member } => self.handle_suspect(member.id),
405 }
406 let obsolete_notifications = self
407 .notifications
408 .iter()
409 .filter(|&n| n < notification)
410 .cloned()
411 .collect::<Vec<_>>();
412 for n in obsolete_notifications {
413 self.notifications.remove(&n);
414 }
415 self.notifications.add((*notification).clone());
416 }
417 }
418
419 fn handle_alive(&mut self, member: &Member) {
420 self.update_member(member);
421 }
422
423 fn handle_suspect(&mut self, member_id: MemberId) {
424 match self.members.get(&member_id) {
425 Some(member) => {
426 if self.suspicions.iter().find(|s| s.member_id == member_id).is_none() {
428 info!(self.logger, "Start suspecting member {:?}", member_id);
429 self.suspicions.push_back(Suspicion::new(member_id));
430 self.notifications.add(Notification::Suspect { member: member.clone() });
431 }
432 }
433 None => debug!(
434 self.logger,
435 "Trying to suspect member {:?}, which has already been removed", member_id
436 ),
437 }
438 }
439
440 fn remove_member(&mut self, member_id: &MemberId) {
441 if let Some(removed_member) = self.members.remove(member_id) {
442 let idx = self.ping_order.iter().position(|e| e == member_id).unwrap();
443 self.ping_order.remove(idx);
444 self.broadcast.remove(member_id);
445 if idx <= self.next_member_index && self.next_member_index > 0 {
446 self.next_member_index -= 1;
447 }
448 info!(self.logger, "Member removed: {:?}", removed_member);
449 }
450 }
451
452 fn get_next_member(&mut self) -> Option<MemberId> {
453 if self.ping_order.is_empty() {
454 return None;
455 }
456 if self.next_member_index == 0 {
461 self.ping_order.shuffle(&mut self.rng);
462 }
463 let target = self.ping_order[self.next_member_index];
464 self.next_member_index = (self.next_member_index + 1) % self.ping_order.len();
465 Some(target)
466 }
467
468 fn get_next_sequence_number(&mut self) -> u64 {
469 let sequence_number = self.sequence_number;
470 self.sequence_number += 1;
471 sequence_number
472 }
473
474 fn handle_protocol_event(&mut self, event: &Event) -> Result<()> {
475 if event.readiness().is_readable() {
476 if let Some(letter) = self.recv_letter() {
477 match letter.message {
478 IncomingMessage::Ping(m) => self.handle_ping(&m),
479 IncomingMessage::Ack(m) => self.handle_ack(&m),
480 IncomingMessage::PingRequest(m) => self.handle_indirect_ping(&m),
481 }
482 }
483 } else if event.readiness().is_writable() {
484 if let Some(request) = self.requests.pop_front() {
485 debug!(self.logger, "{:?}", request);
486 match request {
487 Request::Init(address) => {
488 let message = DisseminationMessageEncoder::new(1024)
489 .message_type(MessageType::Ping)?
490 .sender(&self.myself)?
491 .sequence_number(0)?
492 .encode();
493 self.send_message(address, message);
494 self.acks.push(Ack::new(request));
495 }
496 Request::Ping(ref header) if self.members.contains_key(&header.member_id) => {
497 let message = DisseminationMessageEncoder::new(1024)
498 .message_type(MessageType::Ping)?
499 .sender(&self.myself)?
500 .sequence_number(header.sequence_number)?
501 .notifications(self.notifications.iter())?
502 .broadcast(self.broadcast.iter().map(|id| &self.members[id]))?
503 .encode();
504 self.send_message(self.members[&header.member_id].address, message);
505 self.acks.push(Ack::new(request));
506 }
507 Request::Ping(ref header) => {
508 info!(
509 self.logger,
510 "Dropping Ping message, member {} has already been removed.", header.member_id
511 );
512 }
513 Request::PingIndirect(ref header) if self.members.contains_key(&header.member_id) => {
514 let indirect_members = self
515 .members
516 .keys()
517 .filter(|&key| *key != header.member_id)
518 .take(self.config.num_indirect as usize)
519 .cloned()
520 .collect::<Vec<_>>();
521 indirect_members.iter().try_for_each(|member_id| -> Result<()> {
522 let message = PingRequestMessageEncoder::new()
523 .sender(&self.myself)?
524 .sequence_number(header.sequence_number)?
525 .target(&self.members[&header.member_id])?
526 .encode();
527 self.send_message(self.members[member_id].address, message);
528 Ok(())
529 })?;
530 self.acks.push(Ack::new(request));
531 }
532 Request::PingIndirect(ref header) => {
533 info!(
534 self.logger,
535 "Dropping PingIndirect message, member {} has already been removed.", header.member_id
536 );
537 }
538 Request::PingProxy(ref ping_proxy) => {
539 let message = DisseminationMessageEncoder::new(1024)
540 .message_type(MessageType::Ping)?
541 .sender(&self.myself)?
542 .sequence_number(ping_proxy.sequence_number)?
543 .notifications(self.notifications.iter())?
544 .broadcast(self.broadcast.iter().map(|id| &self.members[id]))?
545 .encode();
546 self.send_message(ping_proxy.target.address, message);
547 self.acks.push(Ack::new(request));
548 }
549 Request::Ack(header) => {
550 let message = DisseminationMessageEncoder::new(1024)
551 .message_type(MessageType::PingAck)?
552 .sender(&self.myself)?
553 .sequence_number(header.sequence_number)?
554 .notifications(self.notifications.iter())?
555 .broadcast(self.broadcast.iter().map(|id| &self.members[id]))?
556 .encode();
557 self.send_message(self.members[&header.member_id].address, message);
558 }
559 Request::AckIndirect(ack_indirect) => {
560 let message = DisseminationMessageEncoder::new(1024)
561 .message_type(MessageType::PingAck)?
562 .sender(&self.myself)?
563 .sequence_number(ack_indirect.sequence_number)?
564 .encode();
565 self.send_message(ack_indirect.target.address, message);
566 }
567 }
568 }
569 }
570 Ok(())
571 }
572
573 fn update_state(&mut self, message: &DisseminationMessageIn) {
574 self.update_notifications(message.notifications.iter());
575 self.update_member(&message.sender);
576 self.update_members(message.broadcast.iter());
577 }
578
579 fn handle_ack(&mut self, message: &DisseminationMessageIn) {
580 for ack in self.acks.drain(..).collect::<Vec<_>>() {
581 match ack.request {
582 Request::Init(address) => {
583 self.update_state(message);
584 if message.sender.address != address || message.sequence_number != 0 {
585 panic!("Initial ping request failed, unable to continue");
586 }
587 continue;
588 }
589 Request::PingIndirect(ref header) => {
590 self.update_state(message);
591 if message.sender.id == header.member_id && message.sequence_number == header.sequence_number {
592 continue;
593 }
594 }
595 Request::PingProxy(ref ping_proxy) => {
596 if message.sender.id == ping_proxy.target.id
597 && message.sequence_number == ping_proxy.sequence_number
598 {
599 self.requests.push_back(Request::AckIndirect(AckIndirectRequest {
600 target: ping_proxy.sender.clone(),
601 sequence_number: ping_proxy.sequence_number,
602 }));
603 continue;
604 }
605 }
606 Request::Ping(ref header) => {
607 self.update_state(message);
608 if message.sender.id == header.member_id && message.sequence_number == header.sequence_number {
609 continue;
610 }
611 }
612 _ => unreachable!(),
613 }
614 self.acks.push(ack);
615 }
616 }
617
618 fn handle_ping(&mut self, message: &DisseminationMessageIn) {
619 self.update_state(message);
620 self.requests.push_back(Request::Ack(Header {
621 member_id: message.sender.id,
622 sequence_number: message.sequence_number,
623 }));
624 }
625
626 fn handle_indirect_ping(&mut self, message: &PingRequestMessageIn) {
627 self.requests.push_back(Request::PingProxy(PingProxyRequest {
628 sender: message.sender.clone(),
629 target: message.target.clone(),
630 sequence_number: message.sequence_number,
631 }));
632 }
633}