foca/
runtime.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4use alloc::collections::VecDeque;
5use core::{cmp::Ordering, time::Duration};
6
7use bytes::{Bytes, BytesMut};
8
9#[cfg(feature = "unstable-notifications")]
10use crate::{Header, ProbeNumber};
11use crate::{Identity, Incarnation};
12
13/// A Runtime is Foca's gateway to the real world: here is where
14/// implementations decide how to interact with the network, the
15/// hardware timer and the user.
16///
17/// Implementations may react directly to it for a fully synchronous
18/// behavior or accumulate-then-drain when dispatching via fancier
19/// mechanisms like async.
20pub trait Runtime<T>
21where
22    T: Identity,
23{
24    /// Whenever something changes Foca's state significantly a
25    /// notification is emitted.
26    ///
27    /// It's the best mechanism to watch for membership changes
28    /// and allows implementors to keep track of the cluster state
29    /// without having direct access to the running Foca instance.
30    ///
31    /// Implementations may completely disregard this if desired.
32    fn notify(&mut self, notification: Notification<'_, T>);
33
34    /// This is how Foca connects to an actual transport.
35    ///
36    /// Implementations are responsible for the actual delivery.
37    fn send_to(&mut self, to: T, data: &[u8]);
38
39    /// Request to schedule the delivery of a given event after
40    /// a specified duration.
41    ///
42    /// Implementations MUST ensure that every event is delivered.
43    /// Foca is very tolerant to delays, but non-delivery will
44    /// cause errors.
45    fn submit_after(&mut self, event: Timer<T>, after: Duration);
46}
47
48// A mutable reference to a Runtime is a Runtime too
49impl<T, R> Runtime<T> for &mut R
50where
51    T: Identity,
52    R: Runtime<T>,
53{
54    fn notify(&mut self, notification: Notification<'_, T>) {
55        R::notify(self, notification);
56    }
57
58    fn send_to(&mut self, to: T, data: &[u8]) {
59        R::send_to(self, to, data);
60    }
61
62    fn submit_after(&mut self, event: Timer<T>, after: Duration) {
63        R::submit_after(self, event, after);
64    }
65}
66
67/// A Notification contains information about high-level relevant
68/// state changes in the cluster or Foca itself.
69#[derive(Debug, PartialEq, Eq)]
70pub enum Notification<'a, T> {
71    /// Foca discovered a new active member with identity T.
72    MemberUp(&'a T),
73    /// A previously active member has been declared down by the cluster.
74    ///
75    /// If Foca detects a down member but didn't know about its activity
76    /// before, this notification will not be emitted.
77    ///
78    /// Can only happen if `MemberUp(T)` happened before.
79    MemberDown(&'a T),
80
81    /// Foca has learned that there's a more recent identity with
82    /// the same address and chose to use it instead of the previous
83    /// one.
84    ///
85    /// So `Notification::Rename(A,B)` means that we knew about a member
86    /// `A` but now there's a `B` with the same `Identity::Addr` and
87    /// foca chose to keep it. i.e. `B.win_addr_conflict(A) == true`.
88    ///
89    /// This happens naturally when a member rejoins the cluster after
90    /// any event (maybe they were declared down and `Identity::renew`d
91    /// themselves, maybe it's a restart/upgrade process)
92    ///
93    /// Example:
94    ///
95    /// If `A` was considered Down and `B` is Alive, you'll get
96    /// two notifications, in order:
97    //
98    ///  1. `Notification::Rename(A,B)`
99    ///  2. `Notification::MemberUp(B)`
100    ///
101    /// However, if there's no liveness change (both are active
102    /// or both are down), you'll only get the `Rename` notification
103    Rename(&'a T, &'a T),
104
105    /// Foca's current identity is known by at least one active member
106    /// of the cluster.
107    ///
108    /// Fired when successfully joining a cluster for the first time and
109    /// every time after a successful identity change.
110    Active,
111
112    /// All known active members have either left the cluster or been
113    /// declared down.
114    Idle,
115
116    /// Foca's current identity has been declared down.
117    ///
118    /// Manual intervention via `Foca::change_identity` or
119    /// `Foca::reuse_down_identity` is required to return to a functioning
120    /// state.
121    Defunct,
122
123    /// Foca automatically changed its identity and rejoined the cluster
124    /// after being declared down.
125    ///
126    /// This happens instead of `Defunct` when identities opt-in on
127    /// `Identity::renew()` functionality.
128    Rejoin(&'a T),
129
130    #[cfg(feature = "unstable-notifications")]
131    /// Foca successfully decoded data received via [`crate::Foca::handle_data`]
132    ///
133    /// See [`Header`]
134    DataReceived(&'a Header<T>),
135
136    #[cfg(feature = "unstable-notifications")]
137    /// Foca sent data to a peer
138    ///
139    /// See [`Header`]
140    DataSent(&'a Header<T>),
141
142    #[cfg(feature = "unstable-notifications")]
143    /// Foca has probed a member and didn't receive a timely reply
144    /// from it nor from any other member asked to indirectly ping it
145    ///
146    /// See [`crate::Message`]
147    ProbeFailed(ProbeNumber, &'a T),
148}
149
150impl<T> Notification<'_, T>
151where
152    T: Clone,
153{
154    /// Converts self into a [`OwnedNotification`]
155    pub fn to_owned(self) -> OwnedNotification<T> {
156        match self {
157            Notification::MemberUp(m) => OwnedNotification::MemberUp(m.clone()),
158            Notification::MemberDown(m) => OwnedNotification::MemberDown(m.clone()),
159            Notification::Rename(before, after) => {
160                OwnedNotification::Rename(before.clone(), after.clone())
161            }
162            Notification::Active => OwnedNotification::Active,
163            Notification::Idle => OwnedNotification::Idle,
164            Notification::Defunct => OwnedNotification::Defunct,
165            Notification::Rejoin(id) => OwnedNotification::Rejoin(id.clone()),
166            #[cfg(feature = "unstable-notifications")]
167            Notification::DataReceived(h) => OwnedNotification::DataReceived(h.clone()),
168            #[cfg(feature = "unstable-notifications")]
169            Notification::DataSent(h) => OwnedNotification::DataSent(h.clone()),
170            #[cfg(feature = "unstable-notifications")]
171            Notification::ProbeFailed(n, m) => OwnedNotification::ProbeFailed(n, m.clone()),
172        }
173    }
174}
175
176/// An owned `Notification`, for convenience.
177///
178/// See [`Notification`] for details
179#[derive(Debug, Clone, PartialEq, Eq)]
180pub enum OwnedNotification<T> {
181    /// See [`Notification::MemberUp`]
182    MemberUp(T),
183    /// See [`Notification::MemberDown`]
184    MemberDown(T),
185    /// See [`Notification::Rename`]
186    Rename(T, T),
187    /// See [`Notification::Active`]
188    Active,
189    /// See [`Notification::Idle`]
190    Idle,
191    /// See [`Notification::Defunct`]
192    Defunct,
193    /// See [`Notification::Rejoin`]
194    Rejoin(T),
195    #[cfg(feature = "unstable-notifications")]
196    /// See [`Notification::DataReceived`]
197    DataReceived(Header<T>),
198
199    #[cfg(feature = "unstable-notifications")]
200    /// See [`Notification::DataSent`]
201    DataSent(Header<T>),
202
203    #[cfg(feature = "unstable-notifications")]
204    /// See [`Notification::ProbeFailed`]
205    ProbeFailed(ProbeNumber, T),
206}
207
208/// Timer is an event that's scheduled by a [`Runtime`]. You won't need
209/// to construct or understand these, just ensure a timely delivery.
210///
211/// **Warning:** This type implements [`Ord`] to facilitate correcting
212/// for out-of-order delivery due to the runtime lagging for whatever
213/// reason. It assumes the events being sorted come from the same foca
214/// instance and are not being persisted after being handled
215/// via [`crate::Foca::handle_timer`]. Any use outside of this scenario
216/// will likely lead to unintended consequences.
217#[derive(Debug, Clone, PartialEq, Eq)]
218#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
219pub enum Timer<T> {
220    /// Pick a random active member and initiate the probe cycle.
221    ProbeRandomMember(TimerToken),
222
223    /// Send indirect probes if the direct one hasn't completed yet.
224    SendIndirectProbe {
225        /// The current member being probed
226        probed_id: T,
227        /// See `TimerToken`
228        token: TimerToken,
229    },
230
231    /// Transitions member T from Suspect to Down if the incarnation is
232    /// still the same.
233    ChangeSuspectToDown {
234        /// Target member identity
235        member_id: T,
236        /// Its Incarnation the moment the suspicion was raised. If the
237        /// member refutes the suspicion (by increasing its Incarnation),
238        /// this won't match and it won't be declared Down.
239        incarnation: Incarnation,
240        /// See `TimerToken`
241        token: TimerToken,
242    },
243
244    /// Sends a [`crate::Message::Announce`] to randomly chosen members as
245    /// specified by [`crate::Config::periodic_announce`]
246    PeriodicAnnounce(TimerToken),
247
248    /// Sends a [`crate::Message::Announce`] to randomly chosen members
249    /// that are condidered [`crate::State::Down`] as specified by
250    /// [`crate::Config::periodic_announce_to_down_members`]
251    PeriodicAnnounceDown(TimerToken),
252
253    /// Sends a [`crate::Message::Gossip`] to randomly chosen members as
254    /// specified by [`crate::Config::periodic_gossip`]
255    PeriodicGossip(TimerToken),
256
257    /// Forgets about dead member `T`, allowing them to join the
258    /// cluster again with the same identity.
259    RemoveDown(T),
260}
261
262impl<T> Timer<T> {
263    fn seq(&self) -> u8 {
264        match self {
265            Timer::SendIndirectProbe {
266                probed_id: _,
267                token: _,
268            } => 0,
269            Timer::ProbeRandomMember(_) => 1,
270            Timer::ChangeSuspectToDown {
271                member_id: _,
272                incarnation: _,
273                token: _,
274            } => 2,
275            Timer::PeriodicAnnounce(_) => 3,
276            Timer::PeriodicGossip(_) => 4,
277            Timer::RemoveDown(_) => 5,
278            Timer::PeriodicAnnounceDown(_) => 6,
279        }
280    }
281}
282
283impl<T: PartialEq> core::cmp::PartialOrd for Timer<T> {
284    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
285        self.seq().partial_cmp(&other.seq())
286    }
287}
288
289impl<T: Eq> core::cmp::Ord for Timer<T> {
290    fn cmp(&self, other: &Self) -> Ordering {
291        self.partial_cmp(other).expect("total ordering")
292    }
293}
294
295/// `TimerToken` is simply a bookkeeping mechanism to try and prevent
296/// reacting to events dispatched that aren't relevant anymore.
297///
298/// Certain interactions may cause Foca to decide to disregard every
299/// event it scheduled previously- so it changes the token in order
300/// to drop everything that doesn't match.
301///
302/// Similar in spirit to [`crate::ProbeNumber`].
303pub type TimerToken = u8;
304
305/// A `Runtime` implementation that's good enough for simple use-cases.
306///
307/// It accumulates all events that happen during an interaction with
308/// `crate::Foca` and users must drain those and react accordingly.
309///
310/// Better runtimes would react directly to the events, intead of
311/// needlessly storing the events in a queue.
312///
313/// Users must drain the runtime immediately after interacting with
314/// foca. Example:
315///
316/// See it in use at `examples/foca_insecure_udp_agent.rs`
317pub struct AccumulatingRuntime<T> {
318    to_send: VecDeque<(T, Bytes)>,
319    to_schedule: VecDeque<(Duration, Timer<T>)>,
320    notifications: VecDeque<OwnedNotification<T>>,
321    buf: BytesMut,
322}
323
324impl<T> Default for AccumulatingRuntime<T> {
325    fn default() -> Self {
326        Self {
327            to_send: Default::default(),
328            to_schedule: Default::default(),
329            notifications: Default::default(),
330            buf: Default::default(),
331        }
332    }
333}
334
335impl<T: Identity> Runtime<T> for AccumulatingRuntime<T> {
336    fn notify(&mut self, notification: Notification<'_, T>) {
337        self.notifications.push_back(notification.to_owned());
338    }
339
340    fn send_to(&mut self, to: T, data: &[u8]) {
341        self.buf.extend_from_slice(data);
342        let packet = self.buf.split().freeze();
343        self.to_send.push_back((to, packet));
344    }
345
346    fn submit_after(&mut self, event: Timer<T>, after: Duration) {
347        self.to_schedule.push_back((after, event));
348    }
349}
350
351impl<T> AccumulatingRuntime<T> {
352    /// Create a new `AccumulatingRuntime`
353    pub fn new() -> Self {
354        Self::default()
355    }
356
357    /// Yields data to be sent to a cluster member `T` in the
358    /// order they've happened.
359    ///
360    /// Users are expected to drain it until it yields `None`
361    /// after every interaction with `crate::Foca`
362    pub fn to_send(&mut self) -> Option<(T, Bytes)> {
363        self.to_send.pop_front()
364    }
365
366    /// Yields timer events and how far in the future they
367    /// must be given back to the foca instance that produced it
368    ///
369    /// Users are expected to drain it until it yields `None`
370    /// after every interaction with `crate::Foca`
371    pub fn to_schedule(&mut self) -> Option<(Duration, Timer<T>)> {
372        self.to_schedule.pop_front()
373    }
374
375    /// Yields event notifications in the order they've happened
376    ///
377    /// Users are expected to drain it until it yields `None`
378    /// after every interaction with `crate::Foca`
379    pub fn to_notify(&mut self) -> Option<OwnedNotification<T>> {
380        self.notifications.pop_front()
381    }
382
383    /// Returns how many unhandled events are left in this runtime
384    ///
385    /// Should be brought down to zero after every interaction with
386    /// `crate::Foca`
387    pub fn backlog(&self) -> usize {
388        self.to_send.len() + self.to_schedule.len() + self.notifications.len()
389    }
390}
391
392#[cfg(test)]
393impl<T: PartialEq> AccumulatingRuntime<T> {
394    pub(crate) fn clear(&mut self) {
395        self.notifications.clear();
396        self.to_send.clear();
397        self.to_schedule.clear();
398    }
399
400    pub(crate) fn is_empty(&self) -> bool {
401        self.notifications.is_empty() && self.to_send.is_empty() && self.to_schedule.is_empty()
402    }
403
404    pub(crate) fn take_all_data(&mut self) -> VecDeque<(T, Bytes)> {
405        core::mem::take(&mut self.to_send)
406    }
407
408    pub(crate) fn take_data(&mut self, dst: T) -> Option<Bytes> {
409        let position = self.to_send.iter().position(|(to, _data)| to == &dst)?;
410
411        self.to_send.remove(position).map(|(_, data)| data)
412    }
413
414    pub(crate) fn take_notification(
415        &mut self,
416        wanted: OwnedNotification<T>,
417    ) -> Option<OwnedNotification<T>> {
418        let position = self
419            .notifications
420            .iter()
421            .position(|notification| notification == &wanted)?;
422
423        self.notifications.remove(position)
424    }
425
426    pub(crate) fn take_scheduling(&mut self, timer: Timer<T>) -> Option<Duration> {
427        let position = self
428            .to_schedule
429            .iter()
430            .position(|(_when, event)| event == &timer)?;
431
432        self.to_schedule.remove(position).map(|(when, _)| when)
433    }
434
435    pub(crate) fn find_scheduling<F>(&self, predicate: F) -> Option<&Timer<T>>
436    where
437        F: Fn(&Timer<T>) -> bool,
438    {
439        self.to_schedule
440            .iter()
441            .find(|(_, timer)| predicate(timer))
442            .map(|(_, timer)| timer)
443    }
444}
445
446#[cfg(test)]
447mod tests {
448    use super::Timer;
449
450    #[test]
451    fn timers_sort() {
452        // What we really care about is SendIndirectProbe
453        // appearing before ProbeRandomMember,
454        // Foca tolerates other events in arbitrary order
455        // without emitting scary errors / traces.
456        let mut out_of_order = alloc::vec![
457            Timer::<u8>::RemoveDown(0),
458            Timer::<u8>::ChangeSuspectToDown {
459                member_id: 0,
460                incarnation: 0,
461                token: 0,
462            },
463            Timer::<u8>::ProbeRandomMember(0),
464            Timer::<u8>::SendIndirectProbe {
465                probed_id: 0,
466                token: 0
467            },
468        ];
469
470        out_of_order.sort_unstable();
471
472        assert_eq!(
473            alloc::vec![
474                Timer::<u8>::SendIndirectProbe {
475                    probed_id: 0,
476                    token: 0
477                },
478                Timer::<u8>::ProbeRandomMember(0),
479                Timer::<u8>::ChangeSuspectToDown {
480                    member_id: 0,
481                    incarnation: 0,
482                    token: 0,
483                },
484                Timer::<u8>::RemoveDown(0),
485            ],
486            out_of_order
487        );
488    }
489}