Skip to main content

membership/
sync_node.rs

1#![deny(missing_docs)]
2
3use crate::disseminated::Disseminated;
4use crate::incoming_message::{DisseminationMessageIn, IncomingMessage, PingRequestMessageIn};
5use crate::member::{Member, MemberId};
6use crate::message::MessageType;
7use crate::message_decoder::decode_message;
8use crate::message_encoder::{DisseminationMessageEncoder, OutgoingMessage, PingRequestMessageEncoder};
9use crate::notification::Notification;
10use crate::result::Result;
11use crate::suspicion::Suspicion;
12use crate::ProtocolConfig;
13use failure::{format_err, ResultExt};
14use mio::net::UdpSocket;
15use mio::{Event, Events, Poll, PollOpt, Ready, Token};
16use mio_extras::channel::{Receiver, Sender};
17use rand::rngs::SmallRng;
18use rand::seq::SliceRandom;
19use rand::SeedableRng;
20use slog::{debug, info, warn};
21use std::collections::{HashMap, VecDeque};
22use std::fmt;
23use std::net::SocketAddr;
24use std::time::Duration;
25
26struct IncomingLetter {
27    sender: SocketAddr,
28    message: IncomingMessage,
29}
30
31impl fmt::Debug for IncomingLetter {
32    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
33        write!(
34            f,
35            "IncomingLetter {{ sender: {:#?}, message: {:#?} }}",
36            self.sender, self.message
37        )
38    }
39}
40
41#[derive(Debug)]
42struct Header {
43    member_id: MemberId,
44    sequence_number: u64,
45}
46
47#[derive(Debug)]
48struct Ack {
49    request: Request,
50    request_time: std::time::Instant,
51}
52
53impl Ack {
54    fn new(request: Request) -> Self {
55        Ack {
56            request,
57            request_time: std::time::Instant::now(),
58        }
59    }
60}
61
62#[derive(Debug)]
63struct PingProxyRequest {
64    sender: Member,
65    target: Member,
66    sequence_number: u64,
67}
68
69#[derive(Debug)]
70struct AckIndirectRequest {
71    target: Member,
72    sequence_number: u64,
73}
74
75#[derive(Debug)]
76enum Request {
77    Init(SocketAddr),
78    Ping(Header),
79    PingIndirect(Header),
80    PingProxy(PingProxyRequest),
81    Ack(Header),
82    AckIndirect(AckIndirectRequest),
83}
84
85#[derive(Debug)]
86pub(crate) enum ChannelMessage {
87    Stop,
88    GetMembers(std::sync::mpsc::Sender<Vec<SocketAddr>>),
89}
90
91// Unfortunately SyncNode needs to be passed explicitly, it cannot be captured by closure.
92struct Timeout<F: FnOnce(&mut SyncNode)> {
93    when: std::time::Instant,
94    what: F,
95}
96
97/// Runs the protocol on current thread, blocking it.
98pub(crate) struct SyncNode {
99    config: ProtocolConfig,
100    udp: Option<UdpSocket>,
101    ping_order: Vec<MemberId>,
102    broadcast: Disseminated<MemberId>,
103    notifications: Disseminated<Notification>,
104    members: HashMap<MemberId, Member>,
105    next_member_index: usize,
106    epoch: u64,
107    sequence_number: u64,
108    recv_buffer: Vec<u8>,
109    myself: Member,
110    requests: VecDeque<Request>,
111    receiver: Receiver<ChannelMessage>,
112    acks: Vec<Ack>,
113    rng: SmallRng,
114    suspicions: VecDeque<Suspicion>,
115    timeouts: Vec<Timeout<Box<dyn FnOnce(&mut SyncNode) + Send>>>,
116    logger: slog::Logger,
117}
118
119impl SyncNode {
120    pub(crate) fn new(bind_address: SocketAddr, config: ProtocolConfig) -> (SyncNode, Sender<ChannelMessage>) {
121        let (sender, receiver) = mio_extras::channel::channel();
122        let gossip = SyncNode {
123            config,
124            udp: None,
125            ping_order: vec![],
126            broadcast: Disseminated::new(),
127            notifications: Disseminated::with_limit(20),
128            members: HashMap::new(),
129            next_member_index: 0,
130            epoch: 0,
131            sequence_number: 0,
132            recv_buffer: vec![0u8; 1500],
133            myself: Member::new(bind_address),
134            requests: VecDeque::<Request>::with_capacity(32),
135            receiver,
136            acks: Vec::<Ack>::with_capacity(32),
137            rng: SmallRng::from_entropy(),
138            suspicions: VecDeque::new(),
139            timeouts: Vec::new(),
140            logger: slog::Logger::root(slog::Discard, slog::o!()),
141        };
142        (gossip, sender)
143    }
144
145    pub(crate) fn set_logger(&mut self, logger: slog::Logger) {
146        self.logger = logger.new(slog::o!("id" => self.myself.id.to_string()));
147    }
148
149    pub(crate) fn start(&mut self) -> Result<()> {
150        let poll = Poll::new().unwrap();
151        poll.register(&self.receiver, Token(1), Ready::readable(), PollOpt::empty())?;
152        self.bind(&poll)?;
153
154        let mut events = Events::with_capacity(1024);
155        let mut last_epoch_time = std::time::Instant::now();
156
157        'mainloop: loop {
158            poll.poll(&mut events, Some(Duration::from_millis(100))).unwrap();
159            for event in events.iter() {
160                match event.token() {
161                    Token(0) => {
162                        if let Err(e) = self.handle_protocol_event(&event) {
163                            slog::warn!(self.logger, "Failed to process protocol event: {:?}", e);
164                        }
165                    }
166                    Token(1) => match self.receiver.try_recv() {
167                        Ok(message) => {
168                            debug!(self.logger, "ChannelMessage::{:?}", message);
169                            match message {
170                                ChannelMessage::Stop => {
171                                    break 'mainloop;
172                                }
173                                ChannelMessage::GetMembers(sender) => {
174                                    let members = std::iter::once(&self.myself.address)
175                                        .chain(self.members.values().map(|m| &m.address))
176                                        .cloned()
177                                        .collect::<Vec<_>>();
178                                    if let Err(e) = sender.send(members) {
179                                        warn!(self.logger, "Failed to send list of members: {:?}", e);
180                                    }
181                                }
182                            }
183                        }
184                        Err(e) => {
185                            debug!(self.logger, "Not ready yet: {:?}", e);
186                        }
187                    },
188                    _ => unreachable!(),
189                }
190            }
191
192            self.handle_acks()?;
193
194            self.drain_timeout_suspicions()
195                .into_iter()
196                .for_each(|s| self.handle_timeout_suspicion(&s));
197
198            let now = std::time::Instant::now();
199            if now > (last_epoch_time + Duration::from_secs(self.config.protocol_period)) {
200                //                self.show_metrics();
201                debug!(self.logger, "Notifications: {:?}", self.notifications);
202                debug!(self.logger, "Broadcast: {:?}", self.broadcast);
203
204                self.advance_epoch();
205                last_epoch_time = now;
206            }
207
208            self.handle_timeouts();
209        }
210
211        Ok(())
212    }
213
214    fn handle_acks(&mut self) -> Result<()> {
215        let now = std::time::Instant::now();
216        let ack_timeout = Duration::from_secs(self.config.ack_timeout as u64);
217        let (handle, postpone): (Vec<_>, Vec<_>) = self
218            .acks
219            .drain(..)
220            .partition(|ack| ack.request_time + ack_timeout <= now);
221        handle.into_iter().try_for_each(|ack| self.handle_timeout_ack(ack))?;
222        self.acks = postpone;
223        Ok(())
224    }
225
226    fn handle_timeouts(&mut self) {
227        let now = std::time::Instant::now();
228        let (handle, postpone): (Vec<_>, Vec<_>) = self.timeouts.drain(..).partition(|t| t.when <= now);
229        handle.into_iter().for_each(|t| (t.what)(self));
230        self.timeouts = postpone;
231    }
232
233    fn drain_timeout_suspicions(&mut self) -> Vec<Suspicion> {
234        let mut suspicions = Vec::new();
235        loop {
236            match self.suspicions.front() {
237                Some(ref suspicion) => {
238                    if std::time::Instant::now()
239                        > (suspicion.created + Duration::from_secs(self.config.suspect_timeout as u64))
240                    {
241                        suspicions.push(self.suspicions.pop_front().unwrap());
242                    } else {
243                        break;
244                    }
245                }
246                None => break,
247            }
248        }
249        suspicions
250    }
251
252    pub(crate) fn join(&mut self, member: SocketAddr) -> Result<()> {
253        assert_ne!(member, self.myself.address, "Can't join yourself");
254        self.requests.push_front(Request::Init(member));
255        self.start()
256    }
257
258    fn handle_timeout_suspicion(&mut self, suspicion: &Suspicion) {
259        // FIXME: check that the Confirm notification about the member has been published.
260        if !self.members.contains_key(&suspicion.member_id) {
261            debug!(self.logger, "Member {} already removed.", suspicion.member_id);
262            return;
263        }
264
265        let confirm = Notification::Confirm {
266            member: self.members[&suspicion.member_id].clone(),
267        };
268        self.notifications.remove(&Notification::Suspect {
269            member: self.members[&suspicion.member_id].clone(),
270        });
271        self.notifications.add(confirm);
272        self.remove_member(&suspicion.member_id)
273    }
274
275    fn advance_epoch(&mut self) {
276        if let Some(member_id) = self.get_next_member() {
277            let ping = Request::Ping(Header {
278                member_id,
279                sequence_number: self.get_next_sequence_number(),
280            });
281            self.requests.push_front(ping);
282        }
283        self.epoch += 1;
284        info!(self.logger, "New epoch: {}", self.epoch);
285    }
286
287    fn handle_timeout_ack(&mut self, ack: Ack) -> Result<()> {
288        match ack.request {
289            Request::Init(address) => {
290                info!(self.logger, "Failed to join {}", address);
291                self.timeouts.push(Timeout {
292                    when: std::time::Instant::now() + std::time::Duration::from_secs(self.config.join_retry_timeout),
293                    what: Box::new(|myself| myself.requests.push_front(ack.request)),
294                });
295            }
296            Request::Ping(header) => {
297                self.requests.push_back(Request::PingIndirect(header));
298            }
299            Request::PingIndirect(header) => {
300                self.handle_suspect(header.member_id);
301            }
302            Request::PingProxy(request) => {
303                warn!(
304                    self.logger,
305                    "Ping proxy from {} to {} timed out", request.sender.id, request.target.id
306                );
307            }
308            _ => unreachable!(),
309        }
310        Ok(())
311    }
312
313    fn bind(&mut self, poll: &Poll) -> Result<()> {
314        self.udp = Some(UdpSocket::bind(&self.myself.address).context("Failed to bind UDP socket")?);
315        // FIXME: change to `PollOpt::edge()`
316        poll.register(
317            self.udp.as_ref().unwrap(),
318            Token(0),
319            Ready::readable() | Ready::writable(),
320            PollOpt::level(),
321        )
322        .map_err(|e| format_err!("Failed to register UDP socket for polling: {:?}", e))
323    }
324
325    fn send_message(&mut self, target: SocketAddr, message: OutgoingMessage) {
326        debug!(self.logger, "{:?} <- {:?}", target, message);
327        // This can happen if this node is returning to a group before the group noticing that the node's previous
328        // instance has died.
329        // FIXME: this is not the best place for this, preferably it should be handled when receiving a message
330        // with member ID not matching oneself.
331        if target == self.myself.address {
332            debug!(self.logger, "Trying to send a message to myself, dropping it");
333            return;
334        }
335        match self.udp.as_ref().unwrap().send_to(message.buffer(), &target) {
336            Err(e) => warn!(self.logger, "Message to {:?} was not delivered due to {:?}", target, e),
337            Ok(count) => {
338                debug!(self.logger, "Send {} bytes", count);
339                if let OutgoingMessage::DisseminationMessage(ref dissemination_message) = message {
340                    self.notifications.mark(dissemination_message.num_notifications());
341                    self.broadcast.mark(dissemination_message.num_broadcast());
342                }
343            }
344        }
345    }
346
347    fn recv_letter(&mut self) -> Option<IncomingLetter> {
348        match self.udp.as_ref().unwrap().recv_from(&mut self.recv_buffer) {
349            Ok((count, sender)) => {
350                debug!(self.logger, "Received {} bytes from {:?}", count, sender);
351                let message = match decode_message(&self.recv_buffer[..count]) {
352                    Ok(message) => message,
353                    Err(e) => {
354                        warn!(self.logger, "Failed to decode from message {:#?}: {}", sender, e);
355                        return None;
356                    }
357                };
358                let letter = IncomingLetter { sender, message };
359                debug!(self.logger, "{:?}", letter);
360                Some(letter)
361            }
362            Err(e) => {
363                warn!(self.logger, "Failed to receive letter due to {:?}", e);
364                None
365            }
366        }
367    }
368
369    fn update_members<'m>(&mut self, members: impl Iterator<Item = &'m Member>) {
370        for member in members {
371            self.update_member(member);
372        }
373    }
374
375    fn update_member(&mut self, member: &Member) {
376        // FIXME(#33): re-write the check
377        let confirmed = self.notifications.iter().find(|n| match n {
378            Notification::Confirm {
379                member: confirmed_member,
380            } if confirmed_member.id == member.id => true,
381            _ => false,
382        });
383        if let Some(member) = confirmed {
384            info!(self.logger, "Member {:?} has already been marked as dead", member);
385            return;
386        }
387        if member.id != self.myself.id && self.members.insert(member.id, member.clone()).is_none() {
388            info!(self.logger, "Member joined: {:?}", member);
389            self.ping_order.push(member.id);
390            self.broadcast.add(member.id);
391        }
392    }
393
394    fn update_notifications<'m>(&mut self, notifications: impl Iterator<Item = &'m Notification>) {
395        // TODO: check this does not miss notifications with not yet seen members
396        // TODO: remove from self.notifications those notifications that are overridden by the new ones
397        for notification in notifications {
398            if self.notifications.iter().find(|&n| n >= notification).is_some() {
399                continue;
400            }
401            match notification {
402                Notification::Confirm { member } => self.remove_member(&member.id),
403                Notification::Alive { member } => self.handle_alive(member),
404                Notification::Suspect { member } => self.handle_suspect(member.id),
405            }
406            let obsolete_notifications = self
407                .notifications
408                .iter()
409                .filter(|&n| n < notification)
410                .cloned()
411                .collect::<Vec<_>>();
412            for n in obsolete_notifications {
413                self.notifications.remove(&n);
414            }
415            self.notifications.add((*notification).clone());
416        }
417    }
418
419    fn handle_alive(&mut self, member: &Member) {
420        self.update_member(member);
421    }
422
423    fn handle_suspect(&mut self, member_id: MemberId) {
424        match self.members.get(&member_id) {
425            Some(member) => {
426                // FIXME: Might be inefficient to check entire deq
427                if self.suspicions.iter().find(|s| s.member_id == member_id).is_none() {
428                    info!(self.logger, "Start suspecting member {:?}", member_id);
429                    self.suspicions.push_back(Suspicion::new(member_id));
430                    self.notifications.add(Notification::Suspect { member: member.clone() });
431                }
432            }
433            None => debug!(
434                self.logger,
435                "Trying to suspect member {:?}, which has already been removed", member_id
436            ),
437        }
438    }
439
440    fn remove_member(&mut self, member_id: &MemberId) {
441        if let Some(removed_member) = self.members.remove(member_id) {
442            let idx = self.ping_order.iter().position(|e| e == member_id).unwrap();
443            self.ping_order.remove(idx);
444            self.broadcast.remove(member_id);
445            if idx <= self.next_member_index && self.next_member_index > 0 {
446                self.next_member_index -= 1;
447            }
448            info!(self.logger, "Member removed: {:?}", removed_member);
449        }
450    }
451
452    fn get_next_member(&mut self) -> Option<MemberId> {
453        if self.ping_order.is_empty() {
454            return None;
455        }
456        // Following SWIM paper, section 4.3, next member to probe is picked in round-robin fashion, with all
457        // the members randomly shuffled after each one has been probed.
458        // FIXME: one thing that is missing is that new members are always added at the end instead of at uniformly
459        // random position.
460        if self.next_member_index == 0 {
461            self.ping_order.shuffle(&mut self.rng);
462        }
463        let target = self.ping_order[self.next_member_index];
464        self.next_member_index = (self.next_member_index + 1) % self.ping_order.len();
465        Some(target)
466    }
467
468    fn get_next_sequence_number(&mut self) -> u64 {
469        let sequence_number = self.sequence_number;
470        self.sequence_number += 1;
471        sequence_number
472    }
473
474    fn handle_protocol_event(&mut self, event: &Event) -> Result<()> {
475        if event.readiness().is_readable() {
476            if let Some(letter) = self.recv_letter() {
477                match letter.message {
478                    IncomingMessage::Ping(m) => self.handle_ping(&m),
479                    IncomingMessage::Ack(m) => self.handle_ack(&m),
480                    IncomingMessage::PingRequest(m) => self.handle_indirect_ping(&m),
481                }
482            }
483        } else if event.readiness().is_writable() {
484            if let Some(request) = self.requests.pop_front() {
485                debug!(self.logger, "{:?}", request);
486                match request {
487                    Request::Init(address) => {
488                        let message = DisseminationMessageEncoder::new(1024)
489                            .message_type(MessageType::Ping)?
490                            .sender(&self.myself)?
491                            .sequence_number(0)?
492                            .encode();
493                        self.send_message(address, message);
494                        self.acks.push(Ack::new(request));
495                    }
496                    Request::Ping(ref header) if self.members.contains_key(&header.member_id) => {
497                        let message = DisseminationMessageEncoder::new(1024)
498                            .message_type(MessageType::Ping)?
499                            .sender(&self.myself)?
500                            .sequence_number(header.sequence_number)?
501                            .notifications(self.notifications.iter())?
502                            .broadcast(self.broadcast.iter().map(|id| &self.members[id]))?
503                            .encode();
504                        self.send_message(self.members[&header.member_id].address, message);
505                        self.acks.push(Ack::new(request));
506                    }
507                    Request::Ping(ref header) => {
508                        info!(
509                            self.logger,
510                            "Dropping Ping message, member {} has already been removed.", header.member_id
511                        );
512                    }
513                    Request::PingIndirect(ref header) if self.members.contains_key(&header.member_id) => {
514                        let indirect_members = self
515                            .members
516                            .keys()
517                            .filter(|&key| *key != header.member_id)
518                            .take(self.config.num_indirect as usize)
519                            .cloned()
520                            .collect::<Vec<_>>();
521                        indirect_members.iter().try_for_each(|member_id| -> Result<()> {
522                            let message = PingRequestMessageEncoder::new()
523                                .sender(&self.myself)?
524                                .sequence_number(header.sequence_number)?
525                                .target(&self.members[&header.member_id])?
526                                .encode();
527                            self.send_message(self.members[member_id].address, message);
528                            Ok(())
529                        })?;
530                        self.acks.push(Ack::new(request));
531                    }
532                    Request::PingIndirect(ref header) => {
533                        info!(
534                            self.logger,
535                            "Dropping PingIndirect message, member {} has already been removed.", header.member_id
536                        );
537                    }
538                    Request::PingProxy(ref ping_proxy) => {
539                        let message = DisseminationMessageEncoder::new(1024)
540                            .message_type(MessageType::Ping)?
541                            .sender(&self.myself)?
542                            .sequence_number(ping_proxy.sequence_number)?
543                            .notifications(self.notifications.iter())?
544                            .broadcast(self.broadcast.iter().map(|id| &self.members[id]))?
545                            .encode();
546                        self.send_message(ping_proxy.target.address, message);
547                        self.acks.push(Ack::new(request));
548                    }
549                    Request::Ack(header) => {
550                        let message = DisseminationMessageEncoder::new(1024)
551                            .message_type(MessageType::PingAck)?
552                            .sender(&self.myself)?
553                            .sequence_number(header.sequence_number)?
554                            .notifications(self.notifications.iter())?
555                            .broadcast(self.broadcast.iter().map(|id| &self.members[id]))?
556                            .encode();
557                        self.send_message(self.members[&header.member_id].address, message);
558                    }
559                    Request::AckIndirect(ack_indirect) => {
560                        let message = DisseminationMessageEncoder::new(1024)
561                            .message_type(MessageType::PingAck)?
562                            .sender(&self.myself)?
563                            .sequence_number(ack_indirect.sequence_number)?
564                            .encode();
565                        self.send_message(ack_indirect.target.address, message);
566                    }
567                }
568            }
569        }
570        Ok(())
571    }
572
573    fn update_state(&mut self, message: &DisseminationMessageIn) {
574        self.update_notifications(message.notifications.iter());
575        self.update_member(&message.sender);
576        self.update_members(message.broadcast.iter());
577    }
578
579    fn handle_ack(&mut self, message: &DisseminationMessageIn) {
580        for ack in self.acks.drain(..).collect::<Vec<_>>() {
581            match ack.request {
582                Request::Init(address) => {
583                    self.update_state(message);
584                    if message.sender.address != address || message.sequence_number != 0 {
585                        panic!("Initial ping request failed, unable to continue");
586                    }
587                    continue;
588                }
589                Request::PingIndirect(ref header) => {
590                    self.update_state(message);
591                    if message.sender.id == header.member_id && message.sequence_number == header.sequence_number {
592                        continue;
593                    }
594                }
595                Request::PingProxy(ref ping_proxy) => {
596                    if message.sender.id == ping_proxy.target.id
597                        && message.sequence_number == ping_proxy.sequence_number
598                    {
599                        self.requests.push_back(Request::AckIndirect(AckIndirectRequest {
600                            target: ping_proxy.sender.clone(),
601                            sequence_number: ping_proxy.sequence_number,
602                        }));
603                        continue;
604                    }
605                }
606                Request::Ping(ref header) => {
607                    self.update_state(message);
608                    if message.sender.id == header.member_id && message.sequence_number == header.sequence_number {
609                        continue;
610                    }
611                }
612                _ => unreachable!(),
613            }
614            self.acks.push(ack);
615        }
616    }
617
618    fn handle_ping(&mut self, message: &DisseminationMessageIn) {
619        self.update_state(message);
620        self.requests.push_back(Request::Ack(Header {
621            member_id: message.sender.id,
622            sequence_number: message.sequence_number,
623        }));
624    }
625
626    fn handle_indirect_ping(&mut self, message: &PingRequestMessageIn) {
627        self.requests.push_back(Request::PingProxy(PingProxyRequest {
628            sender: message.sender.clone(),
629            target: message.target.clone(),
630            sequence_number: message.sequence_number,
631        }));
632    }
633}