use std::sync::atomic::Ordering;
use futures::{FutureExt, StreamExt};
use memberlist_core::{
CheapClone,
bytes::Bytes,
proto::{Data, MaybeResolvedAddress, Meta, Node, OneOrMore, SmallVec},
tracing,
};
use smol_str::SmolStr;
use crate::{
error::Error,
event::EventProducer,
types::{LeaveMessage, Member, Tags, UserEventMessage},
};
use super::*;
impl<T> Serf<T>
where
T: Transport,
{
pub async fn new(
transport: T::Options,
opts: Options,
) -> Result<Self, Error<T, DefaultDelegate<T>>> {
Self::new_in(
None,
None,
transport,
opts,
#[cfg(any(test, feature = "test"))]
None,
)
.await
}
pub async fn with_event_producer(
transport: T::Options,
opts: Options,
ev: EventProducer<T, DefaultDelegate<T>>,
) -> Result<Self, Error<T, DefaultDelegate<T>>> {
Self::new_in(
Some(ev.tx),
None,
transport,
opts,
#[cfg(any(test, feature = "test"))]
None,
)
.await
}
}
impl<T, D> Serf<T, D>
where
D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
T: Transport,
{
pub async fn with_delegate(
transport: T::Options,
opts: Options,
delegate: D,
) -> Result<Self, Error<T, D>> {
Self::new_in(
None,
Some(delegate),
transport,
opts,
#[cfg(any(test, feature = "test"))]
None,
)
.await
}
pub async fn with_event_producer_and_delegate(
transport: T::Options,
opts: Options,
ev: EventProducer<T, D>,
delegate: D,
) -> Result<Self, Error<T, D>> {
Self::new_in(
Some(ev.tx),
Some(delegate),
transport,
opts,
#[cfg(any(test, feature = "test"))]
None,
)
.await
}
#[inline]
pub fn local_id(&self) -> &T::Id {
self.inner.memberlist.local_id()
}
#[inline]
pub fn advertise_node(&self) -> Node<T::Id, T::ResolvedAddress> {
self.inner.memberlist.advertise_node()
}
#[inline]
#[cfg(feature = "encryption")]
#[cfg_attr(docsrs, doc(cfg(feature = "encryption")))]
pub fn encryption_enabled(&self) -> bool {
self.inner.memberlist.encryption_enabled()
}
#[inline]
pub fn shutdown_rx(&self) -> async_channel::Receiver<()> {
self.inner.shutdown_rx.clone()
}
#[inline]
pub fn state(&self) -> SerfState {
*self.inner.state.lock()
}
#[inline]
pub async fn members(&self) -> OneOrMore<Member<T::Id, T::ResolvedAddress>> {
self
.inner
.members
.read()
.await
.states
.values()
.map(|s| s.member.cheap_clone())
.collect()
}
#[inline]
pub async fn stats(&self) -> Stats {
let (num_members, num_failed, num_left, health_score) = {
let members = self.inner.members.read().await;
let num_members = members.states.len();
let num_failed = members.failed_members.len();
let num_left = members.left_members.len();
let health_score = self.inner.memberlist.health_score();
(num_members, num_failed, num_left, health_score)
};
#[cfg(not(feature = "encryption"))]
let encrypted = false;
#[cfg(feature = "encryption")]
let encrypted = self.inner.memberlist.encryption_enabled();
Stats {
members: num_members,
failed: num_failed,
left: num_left,
health_score,
member_time: self.inner.clock.time().into(),
event_time: self.inner.event_clock.time().into(),
query_time: self.inner.query_clock.time().into(),
intent_queue: self.inner.broadcasts.num_queued().await,
event_queue: self.inner.event_broadcasts.num_queued().await,
query_queue: self.inner.query_broadcasts.num_queued().await,
encrypted,
coordinate_resets: self
.inner
.coord_core
.as_ref()
.map(|coord| coord.client.stats().resets),
}
}
#[inline]
pub async fn num_members(&self) -> usize {
self.inner.members.read().await.states.len()
}
#[cfg(feature = "encryption")]
#[cfg_attr(docsrs, doc(cfg(feature = "encryption")))]
#[inline]
pub fn key_manager(&self) -> &crate::key_manager::KeyManager<T, D> {
&self.inner.key_manager
}
#[inline]
pub async fn local_member(&self) -> Member<T::Id, T::ResolvedAddress> {
self
.inner
.members
.read()
.await
.states
.get(self.inner.memberlist.local_id())
.unwrap()
.member
.cheap_clone()
}
#[inline]
pub async fn set_tags(&self, tags: Tags) -> Result<(), Error<T, D>> {
let tags_encoded_len = tags.encoded_len_with_length_delimited();
if tags_encoded_len > Meta::MAX_SIZE {
return Err(Error::tags_too_large(tags_encoded_len));
}
self.inner.opts.tags.store(Arc::new(tags));
self
.inner
.memberlist
.update_node(self.inner.opts.broadcast_timeout)
.await
.map_err(From::from)
}
#[inline]
pub async fn user_event(
&self,
name: impl Into<SmolStr>,
payload: impl Into<Bytes>,
coalesce: bool,
) -> Result<(), Error<T, D>> {
let name: SmolStr = name.into();
let payload: Bytes = payload.into();
let payload_size_before_encoding = name.len() + payload.len();
if payload_size_before_encoding > self.inner.opts.max_user_event_size {
return Err(Error::user_event_limit_too_large(
self.inner.opts.max_user_event_size,
));
}
if payload_size_before_encoding > USER_EVENT_SIZE_LIMIT {
return Err(Error::user_event_too_large(USER_EVENT_SIZE_LIMIT));
}
let msg = UserEventMessage {
ltime: self.inner.event_clock.time(),
name: name.clone(),
payload,
cc: coalesce,
};
let len = crate::types::encoded_message_len(&msg);
if len > self.inner.opts.max_user_event_size {
return Err(Error::raw_user_event_too_large(len));
}
if len > USER_EVENT_SIZE_LIMIT {
return Err(Error::raw_user_event_too_large(len));
}
let raw = crate::types::encode_message_to_bytes(&msg)?;
self.inner.event_clock.increment();
self.handle_user_event(either::Either::Right(msg)).await;
self
.inner
.event_broadcasts
.queue_broadcast(SerfBroadcast {
msg: raw,
notify_tx: None,
})
.await;
Ok(())
}
pub async fn query(
&self,
name: impl Into<SmolStr>,
payload: impl Into<Bytes>,
params: Option<QueryParam<T::Id>>,
) -> Result<QueryResponse<T::Id, T::ResolvedAddress>, Error<T, D>> {
self
.query_in(name.into(), payload.into(), params, None)
.await
}
pub async fn join(
&self,
node: MaybeResolvedAddress<T::Address, T::ResolvedAddress>,
ignore_old: bool,
) -> Result<T::ResolvedAddress, Error<T, D>> {
let current_state = self.state();
if current_state != SerfState::Alive {
return Err(Error::bad_join_status(current_state));
}
let _join_lock = self.inner.join_lock.lock().await;
if ignore_old {
self.inner.event_join_ignore.store(true, Ordering::SeqCst);
}
match self.inner.memberlist.join(node).await {
Ok(node) => {
if let Err(e) = self.broadcast_join(self.inner.clock.time()).await {
if ignore_old {
self.inner.event_join_ignore.store(false, Ordering::SeqCst);
}
return Err(e);
}
if ignore_old {
self.inner.event_join_ignore.store(false, Ordering::SeqCst);
}
Ok(node)
}
Err(e) => {
if ignore_old {
self.inner.event_join_ignore.store(false, Ordering::SeqCst);
}
Err(Error::from(e))
}
}
}
pub async fn join_many(
&self,
existing: impl Iterator<Item = MaybeResolvedAddress<T::Address, T::ResolvedAddress>>,
ignore_old: bool,
) -> Result<SmallVec<T::ResolvedAddress>, (SmallVec<T::ResolvedAddress>, Error<T, D>)> {
let current_state = self.state();
if current_state != SerfState::Alive {
return Err((SmallVec::new(), Error::bad_join_status(current_state)));
}
let _join_lock = self.inner.join_lock.lock().await;
if ignore_old {
self.inner.event_join_ignore.store(true, Ordering::SeqCst);
}
match self.inner.memberlist.join_many(existing).await {
Ok(joined) => {
if let Err(e) = self.broadcast_join(self.inner.clock.time()).await {
self.inner.event_join_ignore.store(false, Ordering::SeqCst);
return Err((joined, e));
}
self.inner.event_join_ignore.store(false, Ordering::SeqCst);
Ok(joined)
}
Err((joined, err)) => {
if !joined.is_empty() {
if let Err(e) = self.broadcast_join(self.inner.clock.time()).await {
self.inner.event_join_ignore.store(false, Ordering::SeqCst);
return Err((
joined,
Error::Multiple(std::sync::Arc::from_iter([err.into(), e])),
));
}
self.inner.event_join_ignore.store(false, Ordering::SeqCst);
Err((joined, Error::from(err)))
} else {
self.inner.event_join_ignore.store(false, Ordering::SeqCst);
Err((joined, Error::from(err)))
}
}
}
}
pub async fn leave(&self) -> Result<(), Error<T, D>> {
{
let mut s = self.inner.state.lock();
match *s {
SerfState::Left => return Ok(()),
SerfState::Leaving => return Err(Error::bad_leave_status(*s)),
SerfState::Shutdown => return Err(Error::bad_leave_status(*s)),
_ => {
*s = SerfState::Leaving;
}
}
}
if let Some(ref snap) = self.inner.snapshot {
snap.leave().await;
}
let msg = LeaveMessage {
ltime: self.inner.clock.time(),
id: self.inner.memberlist.local_id().cheap_clone(),
prune: false,
};
self.inner.clock.increment();
self.handle_node_leave_intent(&msg).await;
if self.has_alive_members().await {
let (notify_tx, notify_rx) = async_channel::bounded(1);
let msg = crate::types::encode_message_to_bytes(&msg)?;
self.broadcast(msg, Some(notify_tx)).await?;
futures::select! {
_ = notify_rx.recv().fuse() => {
}
_ = <T::Runtime as RuntimeLite>::sleep(self.inner.opts.broadcast_timeout).fuse() => {
tracing::warn!("serf: timeout while waiting for graceful leave");
}
}
}
if let Err(e) = self
.inner
.memberlist
.leave(self.inner.opts.broadcast_timeout)
.await
{
tracing::warn!("serf: timeout waiting for leave broadcast: {}", e);
}
<T::Runtime as RuntimeLite>::sleep(self.inner.opts.leave_propagate_delay).await;
{
let mut s = self.inner.state.lock();
match *s {
SerfState::Shutdown => {}
_ => {
*s = SerfState::Left;
}
}
}
Ok(())
}
pub async fn remove_failed_node(&self, id: T::Id) -> Result<(), Error<T, D>> {
self.force_leave(id, false).await
}
pub async fn remove_failed_node_prune(&self, id: T::Id) -> Result<(), Error<T, D>> {
self.force_leave(id, true).await
}
pub async fn shutdown(&self) -> Result<(), Error<T, D>> {
{
let mut s = self.inner.state.lock();
match *s {
SerfState::Shutdown => return Ok(()),
SerfState::Left => {}
_ => {
tracing::warn!("serf: shutdown without a leave");
}
}
*s = SerfState::Shutdown;
}
self.inner.memberlist.shutdown().await?;
self.inner.shutdown_tx.close();
if let Some(ref snap) = self.inner.snapshot {
snap.wait().await;
}
loop {
if let Ok(mut handles) = self.inner.handles.try_borrow_mut() {
let mut futs = core::mem::take(&mut *handles);
while futs.next().await.is_some() {}
break;
}
}
Ok(())
}
pub fn cooridate(&self) -> Result<Coordinate, Error<T, D>> {
if let Some(ref coord) = self.inner.coord_core {
return Ok(coord.client.get_coordinate());
}
Err(Error::coordinates_disabled())
}
pub fn cached_coordinate(&self, id: &T::Id) -> Result<Option<Coordinate>, Error<T, D>> {
if let Some(ref coord) = self.inner.coord_core {
return Ok(coord.cache.read().get(id).cloned());
}
Err(Error::coordinates_disabled())
}
#[inline]
pub fn memberlist(&self) -> &Memberlist<T, SerfDelegate<T, D>> {
&self.inner.memberlist
}
}
#[viewit::viewit(vis_all = "", getters(vis_all = "pub", prefix = "get"), setters(skip))]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct Stats {
members: usize,
failed: usize,
left: usize,
health_score: usize,
member_time: u64,
event_time: u64,
query_time: u64,
intent_queue: usize,
event_queue: usize,
query_queue: usize,
encrypted: bool,
#[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
coordinate_resets: Option<usize>,
}