foca/runtime.rs
1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4use alloc::collections::VecDeque;
5use core::{cmp::Ordering, time::Duration};
6
7use bytes::{Bytes, BytesMut};
8
9#[cfg(feature = "unstable-notifications")]
10use crate::{Header, ProbeNumber};
11use crate::{Identity, Incarnation};
12
13/// A Runtime is Foca's gateway to the real world: here is where
14/// implementations decide how to interact with the network, the
15/// hardware timer and the user.
16///
17/// Implementations may react directly to it for a fully synchronous
18/// behavior or accumulate-then-drain when dispatching via fancier
19/// mechanisms like async.
20pub trait Runtime<T>
21where
22 T: Identity,
23{
24 /// Whenever something changes Foca's state significantly a
25 /// notification is emitted.
26 ///
27 /// It's the best mechanism to watch for membership changes
28 /// and allows implementors to keep track of the cluster state
29 /// without having direct access to the running Foca instance.
30 ///
31 /// Implementations may completely disregard this if desired.
32 fn notify(&mut self, notification: Notification<'_, T>);
33
34 /// This is how Foca connects to an actual transport.
35 ///
36 /// Implementations are responsible for the actual delivery.
37 fn send_to(&mut self, to: T, data: &[u8]);
38
39 /// Request to schedule the delivery of a given event after
40 /// a specified duration.
41 ///
42 /// Implementations MUST ensure that every event is delivered.
43 /// Foca is very tolerant to delays, but non-delivery will
44 /// cause errors.
45 fn submit_after(&mut self, event: Timer<T>, after: Duration);
46}
47
48// A mutable reference to a Runtime is a Runtime too
49impl<T, R> Runtime<T> for &mut R
50where
51 T: Identity,
52 R: Runtime<T>,
53{
54 fn notify(&mut self, notification: Notification<'_, T>) {
55 R::notify(self, notification);
56 }
57
58 fn send_to(&mut self, to: T, data: &[u8]) {
59 R::send_to(self, to, data);
60 }
61
62 fn submit_after(&mut self, event: Timer<T>, after: Duration) {
63 R::submit_after(self, event, after);
64 }
65}
66
67/// A Notification contains information about high-level relevant
68/// state changes in the cluster or Foca itself.
69#[derive(Debug, PartialEq, Eq)]
70pub enum Notification<'a, T> {
71 /// Foca discovered a new active member with identity T.
72 MemberUp(&'a T),
73 /// A previously active member has been declared down by the cluster.
74 ///
75 /// If Foca detects a down member but didn't know about its activity
76 /// before, this notification will not be emitted.
77 ///
78 /// Can only happen if `MemberUp(T)` happened before.
79 MemberDown(&'a T),
80
81 /// Foca has learned that there's a more recent identity with
82 /// the same address and chose to use it instead of the previous
83 /// one.
84 ///
85 /// So `Notification::Rename(A,B)` means that we knew about a member
86 /// `A` but now there's a `B` with the same `Identity::Addr` and
87 /// foca chose to keep it. i.e. `B.win_addr_conflict(A) == true`.
88 ///
89 /// This happens naturally when a member rejoins the cluster after
90 /// any event (maybe they were declared down and `Identity::renew`d
91 /// themselves, maybe it's a restart/upgrade process)
92 ///
93 /// Example:
94 ///
95 /// If `A` was considered Down and `B` is Alive, you'll get
96 /// two notifications, in order:
97 //
98 /// 1. `Notification::Rename(A,B)`
99 /// 2. `Notification::MemberUp(B)`
100 ///
101 /// However, if there's no liveness change (both are active
102 /// or both are down), you'll only get the `Rename` notification
103 Rename(&'a T, &'a T),
104
105 /// Foca's current identity is known by at least one active member
106 /// of the cluster.
107 ///
108 /// Fired when successfully joining a cluster for the first time and
109 /// every time after a successful identity change.
110 Active,
111
112 /// All known active members have either left the cluster or been
113 /// declared down.
114 Idle,
115
116 /// Foca's current identity has been declared down.
117 ///
118 /// Manual intervention via `Foca::change_identity` or
119 /// `Foca::reuse_down_identity` is required to return to a functioning
120 /// state.
121 Defunct,
122
123 /// Foca automatically changed its identity and rejoined the cluster
124 /// after being declared down.
125 ///
126 /// This happens instead of `Defunct` when identities opt-in on
127 /// `Identity::renew()` functionality.
128 Rejoin(&'a T),
129
130 #[cfg(feature = "unstable-notifications")]
131 /// Foca successfully decoded data received via [`crate::Foca::handle_data`]
132 ///
133 /// See [`Header`]
134 DataReceived(&'a Header<T>),
135
136 #[cfg(feature = "unstable-notifications")]
137 /// Foca sent data to a peer
138 ///
139 /// See [`Header`]
140 DataSent(&'a Header<T>),
141
142 #[cfg(feature = "unstable-notifications")]
143 /// Foca has probed a member and didn't receive a timely reply
144 /// from it nor from any other member asked to indirectly ping it
145 ///
146 /// See [`crate::Message`]
147 ProbeFailed(ProbeNumber, &'a T),
148}
149
150impl<T> Notification<'_, T>
151where
152 T: Clone,
153{
154 /// Converts self into a [`OwnedNotification`]
155 pub fn to_owned(self) -> OwnedNotification<T> {
156 match self {
157 Notification::MemberUp(m) => OwnedNotification::MemberUp(m.clone()),
158 Notification::MemberDown(m) => OwnedNotification::MemberDown(m.clone()),
159 Notification::Rename(before, after) => {
160 OwnedNotification::Rename(before.clone(), after.clone())
161 }
162 Notification::Active => OwnedNotification::Active,
163 Notification::Idle => OwnedNotification::Idle,
164 Notification::Defunct => OwnedNotification::Defunct,
165 Notification::Rejoin(id) => OwnedNotification::Rejoin(id.clone()),
166 #[cfg(feature = "unstable-notifications")]
167 Notification::DataReceived(h) => OwnedNotification::DataReceived(h.clone()),
168 #[cfg(feature = "unstable-notifications")]
169 Notification::DataSent(h) => OwnedNotification::DataSent(h.clone()),
170 #[cfg(feature = "unstable-notifications")]
171 Notification::ProbeFailed(n, m) => OwnedNotification::ProbeFailed(n, m.clone()),
172 }
173 }
174}
175
176/// An owned `Notification`, for convenience.
177///
178/// See [`Notification`] for details
179#[derive(Debug, Clone, PartialEq, Eq)]
180pub enum OwnedNotification<T> {
181 /// See [`Notification::MemberUp`]
182 MemberUp(T),
183 /// See [`Notification::MemberDown`]
184 MemberDown(T),
185 /// See [`Notification::Rename`]
186 Rename(T, T),
187 /// See [`Notification::Active`]
188 Active,
189 /// See [`Notification::Idle`]
190 Idle,
191 /// See [`Notification::Defunct`]
192 Defunct,
193 /// See [`Notification::Rejoin`]
194 Rejoin(T),
195 #[cfg(feature = "unstable-notifications")]
196 /// See [`Notification::DataReceived`]
197 DataReceived(Header<T>),
198
199 #[cfg(feature = "unstable-notifications")]
200 /// See [`Notification::DataSent`]
201 DataSent(Header<T>),
202
203 #[cfg(feature = "unstable-notifications")]
204 /// See [`Notification::ProbeFailed`]
205 ProbeFailed(ProbeNumber, T),
206}
207
208/// Timer is an event that's scheduled by a [`Runtime`]. You won't need
209/// to construct or understand these, just ensure a timely delivery.
210///
211/// **Warning:** This type implements [`Ord`] to facilitate correcting
212/// for out-of-order delivery due to the runtime lagging for whatever
213/// reason. It assumes the events being sorted come from the same foca
214/// instance and are not being persisted after being handled
215/// via [`crate::Foca::handle_timer`]. Any use outside of this scenario
216/// will likely lead to unintended consequences.
217#[derive(Debug, Clone, PartialEq, Eq)]
218#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
219pub enum Timer<T> {
220 /// Pick a random active member and initiate the probe cycle.
221 ProbeRandomMember(TimerToken),
222
223 /// Send indirect probes if the direct one hasn't completed yet.
224 SendIndirectProbe {
225 /// The current member being probed
226 probed_id: T,
227 /// See `TimerToken`
228 token: TimerToken,
229 },
230
231 /// Transitions member T from Suspect to Down if the incarnation is
232 /// still the same.
233 ChangeSuspectToDown {
234 /// Target member identity
235 member_id: T,
236 /// Its Incarnation the moment the suspicion was raised. If the
237 /// member refutes the suspicion (by increasing its Incarnation),
238 /// this won't match and it won't be declared Down.
239 incarnation: Incarnation,
240 /// See `TimerToken`
241 token: TimerToken,
242 },
243
244 /// Sends a [`crate::Message::Announce`] to randomly chosen members as
245 /// specified by [`crate::Config::periodic_announce`]
246 PeriodicAnnounce(TimerToken),
247
248 /// Sends a [`crate::Message::Announce`] to randomly chosen members
249 /// that are condidered [`crate::State::Down`] as specified by
250 /// [`crate::Config::periodic_announce_to_down_members`]
251 PeriodicAnnounceDown(TimerToken),
252
253 /// Sends a [`crate::Message::Gossip`] to randomly chosen members as
254 /// specified by [`crate::Config::periodic_gossip`]
255 PeriodicGossip(TimerToken),
256
257 /// Forgets about dead member `T`, allowing them to join the
258 /// cluster again with the same identity.
259 RemoveDown(T),
260}
261
262impl<T> Timer<T> {
263 fn seq(&self) -> u8 {
264 match self {
265 Timer::SendIndirectProbe {
266 probed_id: _,
267 token: _,
268 } => 0,
269 Timer::ProbeRandomMember(_) => 1,
270 Timer::ChangeSuspectToDown {
271 member_id: _,
272 incarnation: _,
273 token: _,
274 } => 2,
275 Timer::PeriodicAnnounce(_) => 3,
276 Timer::PeriodicGossip(_) => 4,
277 Timer::RemoveDown(_) => 5,
278 Timer::PeriodicAnnounceDown(_) => 6,
279 }
280 }
281}
282
283impl<T: PartialEq> core::cmp::PartialOrd for Timer<T> {
284 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
285 self.seq().partial_cmp(&other.seq())
286 }
287}
288
289impl<T: Eq> core::cmp::Ord for Timer<T> {
290 fn cmp(&self, other: &Self) -> Ordering {
291 self.partial_cmp(other).expect("total ordering")
292 }
293}
294
295/// `TimerToken` is simply a bookkeeping mechanism to try and prevent
296/// reacting to events dispatched that aren't relevant anymore.
297///
298/// Certain interactions may cause Foca to decide to disregard every
299/// event it scheduled previously- so it changes the token in order
300/// to drop everything that doesn't match.
301///
302/// Similar in spirit to [`crate::ProbeNumber`].
303pub type TimerToken = u8;
304
305/// A `Runtime` implementation that's good enough for simple use-cases.
306///
307/// It accumulates all events that happen during an interaction with
308/// `crate::Foca` and users must drain those and react accordingly.
309///
310/// Better runtimes would react directly to the events, intead of
311/// needlessly storing the events in a queue.
312///
313/// Users must drain the runtime immediately after interacting with
314/// foca. Example:
315///
316/// See it in use at `examples/foca_insecure_udp_agent.rs`
317pub struct AccumulatingRuntime<T> {
318 to_send: VecDeque<(T, Bytes)>,
319 to_schedule: VecDeque<(Duration, Timer<T>)>,
320 notifications: VecDeque<OwnedNotification<T>>,
321 buf: BytesMut,
322}
323
324impl<T> Default for AccumulatingRuntime<T> {
325 fn default() -> Self {
326 Self {
327 to_send: Default::default(),
328 to_schedule: Default::default(),
329 notifications: Default::default(),
330 buf: Default::default(),
331 }
332 }
333}
334
335impl<T: Identity> Runtime<T> for AccumulatingRuntime<T> {
336 fn notify(&mut self, notification: Notification<'_, T>) {
337 self.notifications.push_back(notification.to_owned());
338 }
339
340 fn send_to(&mut self, to: T, data: &[u8]) {
341 self.buf.extend_from_slice(data);
342 let packet = self.buf.split().freeze();
343 self.to_send.push_back((to, packet));
344 }
345
346 fn submit_after(&mut self, event: Timer<T>, after: Duration) {
347 self.to_schedule.push_back((after, event));
348 }
349}
350
351impl<T> AccumulatingRuntime<T> {
352 /// Create a new `AccumulatingRuntime`
353 pub fn new() -> Self {
354 Self::default()
355 }
356
357 /// Yields data to be sent to a cluster member `T` in the
358 /// order they've happened.
359 ///
360 /// Users are expected to drain it until it yields `None`
361 /// after every interaction with `crate::Foca`
362 pub fn to_send(&mut self) -> Option<(T, Bytes)> {
363 self.to_send.pop_front()
364 }
365
366 /// Yields timer events and how far in the future they
367 /// must be given back to the foca instance that produced it
368 ///
369 /// Users are expected to drain it until it yields `None`
370 /// after every interaction with `crate::Foca`
371 pub fn to_schedule(&mut self) -> Option<(Duration, Timer<T>)> {
372 self.to_schedule.pop_front()
373 }
374
375 /// Yields event notifications in the order they've happened
376 ///
377 /// Users are expected to drain it until it yields `None`
378 /// after every interaction with `crate::Foca`
379 pub fn to_notify(&mut self) -> Option<OwnedNotification<T>> {
380 self.notifications.pop_front()
381 }
382
383 /// Returns how many unhandled events are left in this runtime
384 ///
385 /// Should be brought down to zero after every interaction with
386 /// `crate::Foca`
387 pub fn backlog(&self) -> usize {
388 self.to_send.len() + self.to_schedule.len() + self.notifications.len()
389 }
390}
391
392#[cfg(test)]
393impl<T: PartialEq> AccumulatingRuntime<T> {
394 pub(crate) fn clear(&mut self) {
395 self.notifications.clear();
396 self.to_send.clear();
397 self.to_schedule.clear();
398 }
399
400 pub(crate) fn is_empty(&self) -> bool {
401 self.notifications.is_empty() && self.to_send.is_empty() && self.to_schedule.is_empty()
402 }
403
404 pub(crate) fn take_all_data(&mut self) -> VecDeque<(T, Bytes)> {
405 core::mem::take(&mut self.to_send)
406 }
407
408 pub(crate) fn take_data(&mut self, dst: T) -> Option<Bytes> {
409 let position = self.to_send.iter().position(|(to, _data)| to == &dst)?;
410
411 self.to_send.remove(position).map(|(_, data)| data)
412 }
413
414 pub(crate) fn take_notification(
415 &mut self,
416 wanted: OwnedNotification<T>,
417 ) -> Option<OwnedNotification<T>> {
418 let position = self
419 .notifications
420 .iter()
421 .position(|notification| notification == &wanted)?;
422
423 self.notifications.remove(position)
424 }
425
426 pub(crate) fn take_scheduling(&mut self, timer: Timer<T>) -> Option<Duration> {
427 let position = self
428 .to_schedule
429 .iter()
430 .position(|(_when, event)| event == &timer)?;
431
432 self.to_schedule.remove(position).map(|(when, _)| when)
433 }
434
435 pub(crate) fn find_scheduling<F>(&self, predicate: F) -> Option<&Timer<T>>
436 where
437 F: Fn(&Timer<T>) -> bool,
438 {
439 self.to_schedule
440 .iter()
441 .find(|(_, timer)| predicate(timer))
442 .map(|(_, timer)| timer)
443 }
444}
445
446#[cfg(test)]
447mod tests {
448 use super::Timer;
449
450 #[test]
451 fn timers_sort() {
452 // What we really care about is SendIndirectProbe
453 // appearing before ProbeRandomMember,
454 // Foca tolerates other events in arbitrary order
455 // without emitting scary errors / traces.
456 let mut out_of_order = alloc::vec![
457 Timer::<u8>::RemoveDown(0),
458 Timer::<u8>::ChangeSuspectToDown {
459 member_id: 0,
460 incarnation: 0,
461 token: 0,
462 },
463 Timer::<u8>::ProbeRandomMember(0),
464 Timer::<u8>::SendIndirectProbe {
465 probed_id: 0,
466 token: 0
467 },
468 ];
469
470 out_of_order.sort_unstable();
471
472 assert_eq!(
473 alloc::vec![
474 Timer::<u8>::SendIndirectProbe {
475 probed_id: 0,
476 token: 0
477 },
478 Timer::<u8>::ProbeRandomMember(0),
479 Timer::<u8>::ChangeSuspectToDown {
480 member_id: 0,
481 incarnation: 0,
482 token: 0,
483 },
484 Timer::<u8>::RemoveDown(0),
485 ],
486 out_of_order
487 );
488 }
489}