use alloc::collections::VecDeque;
use core::{cmp::Ordering, time::Duration};
use bytes::{Bytes, BytesMut};
#[cfg(feature = "unstable-notifications")]
use crate::{Header, ProbeNumber};
use crate::{Identity, Incarnation};
pub trait Runtime<T>
where
T: Identity,
{
fn notify(&mut self, notification: Notification<'_, T>);
fn send_to(&mut self, to: T, data: &[u8]);
fn submit_after(&mut self, event: Timer<T>, after: Duration);
}
impl<T, R> Runtime<T> for &mut R
where
T: Identity,
R: Runtime<T>,
{
fn notify(&mut self, notification: Notification<'_, T>) {
R::notify(self, notification);
}
fn send_to(&mut self, to: T, data: &[u8]) {
R::send_to(self, to, data);
}
fn submit_after(&mut self, event: Timer<T>, after: Duration) {
R::submit_after(self, event, after);
}
}
#[derive(Debug, PartialEq, Eq)]
pub enum Notification<'a, T> {
MemberUp(&'a T),
MemberDown(&'a T),
Rename(&'a T, &'a T),
Active,
Idle,
Defunct,
Rejoin(&'a T),
#[cfg(feature = "unstable-notifications")]
DataReceived(&'a Header<T>),
#[cfg(feature = "unstable-notifications")]
DataSent(&'a Header<T>),
#[cfg(feature = "unstable-notifications")]
ProbeFailed(ProbeNumber, &'a T),
}
impl<T> Notification<'_, T>
where
T: Clone,
{
pub fn to_owned(self) -> OwnedNotification<T> {
match self {
Notification::MemberUp(m) => OwnedNotification::MemberUp(m.clone()),
Notification::MemberDown(m) => OwnedNotification::MemberDown(m.clone()),
Notification::Rename(before, after) => {
OwnedNotification::Rename(before.clone(), after.clone())
}
Notification::Active => OwnedNotification::Active,
Notification::Idle => OwnedNotification::Idle,
Notification::Defunct => OwnedNotification::Defunct,
Notification::Rejoin(id) => OwnedNotification::Rejoin(id.clone()),
#[cfg(feature = "unstable-notifications")]
Notification::DataReceived(h) => OwnedNotification::DataReceived(h.clone()),
#[cfg(feature = "unstable-notifications")]
Notification::DataSent(h) => OwnedNotification::DataSent(h.clone()),
#[cfg(feature = "unstable-notifications")]
Notification::ProbeFailed(n, m) => OwnedNotification::ProbeFailed(n, m.clone()),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum OwnedNotification<T> {
MemberUp(T),
MemberDown(T),
Rename(T, T),
Active,
Idle,
Defunct,
Rejoin(T),
#[cfg(feature = "unstable-notifications")]
DataReceived(Header<T>),
#[cfg(feature = "unstable-notifications")]
DataSent(Header<T>),
#[cfg(feature = "unstable-notifications")]
ProbeFailed(ProbeNumber, T),
}
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub enum Timer<T> {
ProbeRandomMember(TimerToken),
SendIndirectProbe {
probed_id: T,
token: TimerToken,
},
ChangeSuspectToDown {
member_id: T,
incarnation: Incarnation,
token: TimerToken,
},
PeriodicAnnounce(TimerToken),
PeriodicAnnounceDown(TimerToken),
PeriodicGossip(TimerToken),
RemoveDown(T),
}
impl<T> Timer<T> {
const fn seq(&self) -> u8 {
match self {
Timer::SendIndirectProbe {
probed_id: _,
token: _,
} => 0,
Timer::ProbeRandomMember(_) => 1,
Timer::ChangeSuspectToDown {
member_id: _,
incarnation: _,
token: _,
} => 2,
Timer::PeriodicAnnounce(_) => 3,
Timer::PeriodicGossip(_) => 4,
Timer::RemoveDown(_) => 5,
Timer::PeriodicAnnounceDown(_) => 6,
}
}
}
impl<T: PartialEq> core::cmp::PartialOrd for Timer<T> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
self.seq().partial_cmp(&other.seq())
}
}
impl<T: Eq> core::cmp::Ord for Timer<T> {
fn cmp(&self, other: &Self) -> Ordering {
self.partial_cmp(other).expect("total ordering")
}
}
pub type TimerToken = u8;
pub struct AccumulatingRuntime<T> {
to_send: VecDeque<(T, Bytes)>,
to_schedule: VecDeque<(Duration, Timer<T>)>,
notifications: VecDeque<OwnedNotification<T>>,
buf: BytesMut,
}
impl<T> Default for AccumulatingRuntime<T> {
fn default() -> Self {
Self {
to_send: Default::default(),
to_schedule: Default::default(),
notifications: Default::default(),
buf: Default::default(),
}
}
}
impl<T: Identity> Runtime<T> for AccumulatingRuntime<T> {
fn notify(&mut self, notification: Notification<'_, T>) {
self.notifications.push_back(notification.to_owned());
}
fn send_to(&mut self, to: T, data: &[u8]) {
self.buf.extend_from_slice(data);
let packet = self.buf.split().freeze();
self.to_send.push_back((to, packet));
}
fn submit_after(&mut self, event: Timer<T>, after: Duration) {
self.to_schedule.push_back((after, event));
}
}
impl<T> AccumulatingRuntime<T> {
pub fn new() -> Self {
Self::default()
}
pub fn to_send(&mut self) -> Option<(T, Bytes)> {
self.to_send.pop_front()
}
pub fn to_schedule(&mut self) -> Option<(Duration, Timer<T>)> {
self.to_schedule.pop_front()
}
pub fn to_notify(&mut self) -> Option<OwnedNotification<T>> {
self.notifications.pop_front()
}
pub fn backlog(&self) -> usize {
self.to_send.len() + self.to_schedule.len() + self.notifications.len()
}
}
#[cfg(test)]
impl<T: PartialEq> AccumulatingRuntime<T> {
pub(crate) fn clear(&mut self) {
self.notifications.clear();
self.to_send.clear();
self.to_schedule.clear();
}
pub(crate) fn is_empty(&self) -> bool {
self.notifications.is_empty() && self.to_send.is_empty() && self.to_schedule.is_empty()
}
pub(crate) fn take_all_data(&mut self) -> VecDeque<(T, Bytes)> {
core::mem::take(&mut self.to_send)
}
pub(crate) fn take_data(&mut self, dst: T) -> Option<Bytes> {
let position = self.to_send.iter().position(|(to, _data)| to == &dst)?;
self.to_send.remove(position).map(|(_, data)| data)
}
pub(crate) fn take_notification(
&mut self,
wanted: OwnedNotification<T>,
) -> Option<OwnedNotification<T>> {
let position = self
.notifications
.iter()
.position(|notification| notification == &wanted)?;
self.notifications.remove(position)
}
pub(crate) fn take_scheduling(&mut self, timer: Timer<T>) -> Option<Duration> {
let position = self
.to_schedule
.iter()
.position(|(_when, event)| event == &timer)?;
self.to_schedule.remove(position).map(|(when, _)| when)
}
pub(crate) fn find_scheduling<F>(&self, predicate: F) -> Option<&Timer<T>>
where
F: Fn(&Timer<T>) -> bool,
{
self.to_schedule
.iter()
.find(|(_, timer)| predicate(timer))
.map(|(_, timer)| timer)
}
}
#[cfg(test)]
mod tests {
use super::Timer;
#[test]
fn timers_sort() {
let mut out_of_order = alloc::vec![
Timer::<u8>::RemoveDown(0),
Timer::<u8>::ChangeSuspectToDown {
member_id: 0,
incarnation: 0,
token: 0,
},
Timer::<u8>::ProbeRandomMember(0),
Timer::<u8>::SendIndirectProbe {
probed_id: 0,
token: 0
},
];
out_of_order.sort_unstable();
assert_eq!(
alloc::vec![
Timer::<u8>::SendIndirectProbe {
probed_id: 0,
token: 0
},
Timer::<u8>::ProbeRandomMember(0),
Timer::<u8>::ChangeSuspectToDown {
member_id: 0,
incarnation: 0,
token: 0,
},
Timer::<u8>::RemoveDown(0),
],
out_of_order
);
}
}