use std::collections::HashMap;
use std::num::{NonZeroU8, NonZeroUsize};
use std::sync::Arc;
use std::time::Duration;
use bincode::DefaultOptions;
use bytes::Bytes;
use color_eyre::eyre::eyre;
use color_eyre::Result;
use flume::Sender;
use foca::{BincodeCodec, Config as FocaConfig, Foca, Identity, Notification, Timer};
use foca::{NoCustomBroadcast, PeriodicParams};
use rand::{rngs::StdRng, SeedableRng};
use tokio::sync::RwLock;
use uhlc::ID;
use crate::protocol::{EldegossId, Sample};
use crate::quic::{gossip_msg, Link};
pub(crate) enum FocaEvent {
Data(Bytes),
Announce(EldegossId),
Timer(Timer<EldegossId>),
}
impl Identity for EldegossId {
type Addr = ID;
fn renew(&self) -> Option<Self> {
Some(Self::new(
uhlc::HLCBuilder::new()
.with_id(self.addr())
.build()
.new_timestamp(),
))
}
fn addr(&self) -> ID {
self.id()
}
fn win_addr_conflict(&self, adversary: &Self) -> bool {
self.clock() > adversary.clock()
}
}
pub(crate) type Membership =
Arc<RwLock<Foca<EldegossId, BincodeCodec<DefaultOptions>, StdRng, NoCustomBroadcast>>>;
pub(crate) async fn start_foca(
identity: EldegossId,
link_pool: Arc<RwLock<HashMap<EldegossId, Arc<Link>>>>,
) -> Result<(Membership, Sender<FocaEvent>)> {
let (foca_event_tx, foca_event_rv) = flume::bounded(1024);
let (outbound_foca_data_tx, outbound_foca_data_rv) = flume::bounded(1024);
let foca_config = {
let period = Duration::from_secs(15);
FocaConfig {
probe_period: period,
probe_rtt: Duration::from_secs(5),
num_indirect_probes: NonZeroUsize::new(5).unwrap(),
max_transmissions: NonZeroU8::new(10).unwrap(),
suspect_to_down_after: Duration::from_secs(30),
remove_down_after: Duration::from_secs(60 * 60 * 24),
max_packet_size: NonZeroUsize::new(1024 * 1024).unwrap(),
notify_down_members: true,
periodic_announce: Some(PeriodicParams {
frequency: Duration::from_secs(30),
num_members: NonZeroUsize::new(1).unwrap(),
}),
periodic_announce_to_down_members: Some(PeriodicParams {
frequency: Duration::from_secs(65),
num_members: NonZeroUsize::new(2).unwrap(),
}),
periodic_gossip: Some(PeriodicParams {
frequency: Duration::from_millis(200),
num_members: NonZeroUsize::new(3).unwrap(),
}),
}
};
let foca = Arc::new(RwLock::new(Foca::new(
identity.clone(),
foca_config,
StdRng::from_entropy(),
BincodeCodec(bincode::DefaultOptions::new()),
)));
let membership = foca.clone();
let mut runtime = foca::AccumulatingRuntime::new();
let foca_event_tx_ = foca_event_tx.clone();
tokio::spawn(async move {
while let Ok(input) = foca_event_rv.recv_async().await {
let result = match input {
FocaEvent::Timer(timer) => foca
.write()
.await
.handle_timer(timer, &mut runtime)
.map_err(|err| eyre!("Error handling timer: {err:?}")),
FocaEvent::Data(data) => foca
.write()
.await
.handle_data(&data, &mut runtime)
.map_err(|err| eyre!("Error handling data: {err:?}")),
FocaEvent::Announce(dst) => foca
.write()
.await
.announce(dst, &mut runtime)
.map_err(|err| eyre!("Error announce: {err:?}")),
};
if let Err(error) = result {
error!("foca handle event error: {error}");
}
while let Some((dst, data)) = runtime.to_send() {
outbound_foca_data_tx.send_async((dst, data)).await.ok();
}
while let Some((delay, event)) = runtime.to_schedule() {
let foca_event_tx_ = foca_event_tx_.clone();
tokio::spawn(async move {
tokio::time::sleep(delay).await;
foca_event_tx_
.send_async(FocaEvent::Timer(event))
.await
.ok();
});
}
while let Some(notification) = runtime.to_notify() {
match notification {
Notification::MemberUp(eid) => info!("member up: {eid:?}"),
Notification::MemberDown(eid) => info!("member down: {eid:?}"),
Notification::Idle => info!("cluster empty"),
Notification::Rename(old, new) => {
info!("member {old:?} is now known as {new:?}")
}
Notification::Active => info!("current identity is active"),
Notification::Defunct => {
info!("current identity is defunct, need change identity")
}
Notification::Rejoin(eid) => info!("member rejoin: {eid:?}"),
}
let members = foca
.read()
.await
.iter_members()
.map(|member| member.id().to_owned())
.collect::<Vec<_>>();
info!("membership: {members:#?}");
}
}
});
tokio::spawn(async move {
while let Ok((_eid, data)) = outbound_foca_data_rv.recv_async().await {
gossip_msg(
Sample::new_foca(identity.hlc().new_timestamp(), data.clone()),
identity.id(),
link_pool.clone(),
)
.await;
}
});
Ok((membership, foca_event_tx))
}