serf-core 0.5.0

A decentralized solution for service discovery and orchestration that is lightweight, highly available, and fault tolerant.
Documentation
use async_channel::Sender;
use memberlist_core::{Broadcast, bytes::Bytes};

#[derive(Debug, PartialEq, Eq, Hash, Clone)]
pub(crate) struct BroadcastId;

impl core::fmt::Display for BroadcastId {
  fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
    Ok(())
  }
}

#[viewit::viewit]
#[derive(Debug)]
pub(crate) struct SerfBroadcast {
  msg: Bytes,
  notify_tx: Option<Sender<()>>,
}

impl Broadcast for SerfBroadcast {
  type Id = BroadcastId;
  type Message = Bytes;

  fn id(&self) -> Option<&Self::Id> {
    None
  }

  fn invalidates(&self, _other: &Self) -> bool {
    false
  }

  fn message(&self) -> &Self::Message {
    &self.msg
  }

  async fn finished(&self) {
    if let Some(ref tx) = self.notify_tx {
      let _ = tx.send(()).await;
    }
  }

  fn encoded_len(msg: &Self::Message) -> usize {
    msg.len()
  }
}

#[cfg(test)]
#[tokio::test]
async fn test_broadcast_finished() {
  use futures::{self, FutureExt};
  use std::time::Duration;

  let (tx, rx) = async_channel::unbounded();

  let b = SerfBroadcast {
    msg: Bytes::new(),
    notify_tx: Some(tx),
  };

  b.finished().await;

  futures::select! {
    _ = rx.recv().fuse() => {}
    _ = tokio::time::sleep(Duration::from_millis(10)).fuse() => {
      panic!("expected broadcast to be finished")
    }
  }
}

#[cfg(test)]
#[tokio::test]
async fn test_broadcast_finished_no_sender() {
  let b = SerfBroadcast {
    msg: Bytes::new(),
    notify_tx: None,
  };

  b.finished().await;
}