blip 0.3.0

A crate for writing fast and highly resilient in-process gRPC service meshes.
Documentation
// Copyright 2020 nytopop (Eric Izoita)
//
// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
// http://opensource.org/licenses/MIT>, at your option. This file may not be
// copied, modified, or distributed except according to those terms.
//! Distributed fault detection.
use super::{
    cut::{self, Subscription},
    proto::{membership_client::MembershipClient, Ack, Edge, Endpoint},
    Cluster,
};
use futures::{
    future::{join, FutureExt},
    stream::{FuturesUnordered, StreamExt},
};
use std::{collections::HashMap, sync::Arc};
use tokio::{
    select,
    time::{sleep, timeout},
};

impl Cluster {
    /// Run the fault detector until the cluster is brought down.
    pub(crate) async fn detect_faults(self: Arc<Self>, mut cuts: Subscription) -> cut::Result {
        loop {
            select! {
                _ = self.spin_fd_probes() => {}
                cut = cuts.recv() => { cut?; }
            }
        }
    }

    /// Initialize a fault detection round and continuously probe all observed subjects. Edge
    /// failures are reported to the rest of the cluster if the configured number of successive
    /// faults is encountered.
    ///
    /// Resolves (and should be restarted) when the next view-change proposal is accepted.
    async fn spin_fd_probes(self: &Arc<Self>) {
        let (conf_id, mut subjects) = async {
            #[derive(Default)]
            struct Subject {
                rings: Vec<u64>,
                faults: usize,
            }

            let mut subjects: HashMap<_, Subject> = HashMap::with_capacity(self.cfg.k);
            let state = self.state.read().await;

            // we might have been assigned the same subject on multiple rings, so we dedupe
            // by subject and track which rings we're authoritative over.
            for (ring, subject) in (state.nodes.successors(&self.local_node()))
                .cloned()
                .enumerate()
            {
                subjects.entry(subject).or_default().rings.push(ring as u64);
            }

            (state.conf_id, subjects)
        }
        .await;

        loop {
            // start sending off probes, each of which times out after fd_timeout.
            let probes = (subjects.iter_mut())
                .map(|(e, s)| self.probe(e, &mut s.faults))
                .collect::<FuturesUnordered<_>>()
                .for_each(|_| async {});

            // wait for all probes to finish, and for fd_timeout to elapse. this caps the
            // rate at which subjects are probed to k per fd_timeout.
            let mut state = join(sleep(self.cfg.fd_timeout), probes)
                .then(|_| self.state.write())
                .await;

            // if there's been a view-change, subjects may have become invalidated.
            if state.conf_id != conf_id {
                break;
            }

            // subjects are marked as faulted if they failed fd_strikes successive probes.
            let faulted = (subjects.iter_mut())
                .filter(|(_, s)| s.faults >= self.cfg.fd_strikes)
                .flat_map(|(e, s)| {
                    s.faults = 0;
                    s.rings.iter().map(move |ring| Edge::down(e.clone(), *ring))
                });

            self.enqueue_edges(&mut *state, faulted);
        }
    }

    /// Probe a subject, modifying the successive `faults` counter appropriately upon success
    /// or failure.
    async fn probe(&self, subject: &Endpoint, faults: &mut usize) {
        let send_probe = timeout(self.cfg.fd_timeout, async {
            let e = self.resolve_endpoint(subject).ok()?;
            let mut c = MembershipClient::connect(e).await.ok()?;
            c.probe(Ack {}).await.ok()
        });

        match send_probe.await.ok().flatten() {
            Some(_) => *faults = 0,
            None => *faults += 1,
        }
    }
}