use std::{
borrow::Cow,
collections::HashSet,
fs::{File, OpenOptions},
io::{BufReader, BufWriter, Read, Seek, Write},
mem,
path::PathBuf,
time::Duration,
};
#[cfg(unix)]
use std::os::unix::prelude::OpenOptionsExt;
use crate::types::UserEventMessage;
use async_channel::{Receiver, Sender};
use byteorder::{LittleEndian, ReadBytesExt};
use futures::FutureExt;
use memberlist_core::{
CheapClone,
agnostic_lite::{AsyncSpawner, RuntimeLite},
bytes::{BufMut, BytesMut},
proto::{Data, MaybeResolvedAddress, TinyVec},
tracing,
transport::{Id, Node, Transport},
};
use rand::seq::SliceRandom;
use crate::{
delegate::Delegate,
event::{CrateEvent, MemberEvent, MemberEventType},
invalid_data_io_error,
types::{Epoch, LamportClock, LamportTime},
};
const FLUSH_INTERVAL: Duration = Duration::from_millis(500);
const CLOCK_UPDATE_INTERVAL: Duration = Duration::from_millis(500);
const TMP_EXT: &str = "compact";
const SNAPSHOT_ERROR_RECOVERY_INTERVAL: Duration = Duration::from_secs(30);
const EVENT_CH_SIZE: usize = 2048;
const SHUTDOWN_FLUSH_TIMEOUT: Duration = Duration::from_millis(250);
const SNAPSHOT_BYTES_PER_NODE: usize = 128;
const SNAPSHOT_COMPACTION_THRESHOLD: usize = 2;
#[derive(Debug, thiserror::Error)]
pub enum SnapshotError {
#[error("failed to open snapshot: {0}")]
Open(std::io::Error),
#[error("failed to open new snapshot: {0}")]
OpenNew(std::io::Error),
#[error("failed to flush new snapshot: {0}")]
FlushNew(std::io::Error),
#[error("failed to flush snapshot: {0}")]
Flush(std::io::Error),
#[error("failed to fsync snapshot: {0}")]
Sync(std::io::Error),
#[error("failed to stat snapshot: {0}")]
Stat(std::io::Error),
#[error("failed to remove old snapshot: {0}")]
Remove(std::io::Error),
#[error("failed to install new snapshot: {0}")]
Install(std::io::Error),
#[error("failed to write to new snapshot: {0}")]
WriteNew(std::io::Error),
#[error("failed to write to snapshot: {0}")]
Write(std::io::Error),
#[error("failed to seek to beginning of snapshot: {0}")]
SeekStart(std::io::Error),
#[error("failed to seek to end of snapshot: {0}")]
SeekEnd(std::io::Error),
#[error("failed to replay snapshot: {0}")]
Replay(std::io::Error),
#[error(transparent)]
UnknownRecordType(#[from] UnknownRecordType),
}
#[derive(Debug, thiserror::Error)]
#[error("unrecognized snapshot record type: {0}")]
pub struct UnknownRecordType(u8);
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
#[repr(u8)]
enum SnapshotRecordType {
Alive = 0,
NotAlive = 1,
Clock = 2,
EventClock = 3,
QueryClock = 4,
Coordinate = 5,
Leave = 6,
Comment = 7,
}
impl TryFrom<u8> for SnapshotRecordType {
type Error = UnknownRecordType;
fn try_from(value: u8) -> Result<Self, Self::Error> {
match value {
0 => Ok(Self::Alive),
1 => Ok(Self::NotAlive),
2 => Ok(Self::Clock),
3 => Ok(Self::EventClock),
4 => Ok(Self::QueryClock),
5 => Ok(Self::Coordinate),
6 => Ok(Self::Leave),
7 => Ok(Self::Comment),
v => Err(UnknownRecordType(v)),
}
}
}
#[allow(dead_code)]
enum SnapshotRecord<'a, I: Clone, A: Clone> {
Alive(Cow<'a, Node<I, A>>),
NotAlive(Cow<'a, Node<I, A>>),
Clock(LamportTime),
EventClock(LamportTime),
QueryClock(LamportTime),
Coordinate,
Leave,
Comment,
}
const MAX_INLINED_BYTES: usize = 64;
macro_rules! encode {
($w:ident.$node: ident::$status: ident) => {{
let node = $node.as_ref();
let encoded_node_len = node.encoded_len();
let encoded_len = 4 + 1 + encoded_node_len;
if encoded_len <= MAX_INLINED_BYTES {
let mut buf = [0u8; MAX_INLINED_BYTES];
buf[0] = Self::$status;
buf[1..5].copy_from_slice(&(encoded_node_len as u32).to_le_bytes());
node.encode(&mut buf[5..]).map_err(invalid_data_io_error)?;
$w.write_all(&buf[..encoded_len]).map(|_| encoded_len)
} else {
let mut buf = BytesMut::with_capacity(encoded_len);
buf.put_u8(Self::$status);
buf.put_u32_le(encoded_node_len as u32);
node.encode(&mut buf).map_err(invalid_data_io_error)?;
$w.write_all(&buf).map(|_| encoded_len)
}
}};
($w:ident.$t: ident($status: ident)) => {{
const N: usize = mem::size_of::<u8>() + mem::size_of::<u64>();
let mut data = [0u8; N];
data[0] = Self::$status;
data[1..N].copy_from_slice(&$t.to_le_bytes());
$w.write_all(&data).map(|_| N)
}};
($w:ident.$ident: ident) => {{ $w.write_all(&[Self::$ident]).map(|_| 1) }};
}
impl<I, A> SnapshotRecord<'_, I, A>
where
I: Id + Data,
A: CheapClone + Data + Send + Sync + 'static,
{
const ALIVE: u8 = 0;
const NOT_ALIVE: u8 = 1;
const CLOCK: u8 = 2;
const EVENT_CLOCK: u8 = 3;
const QUERY_CLOCK: u8 = 4;
const COORDINATE: u8 = 5;
const LEAVE: u8 = 6;
const COMMENT: u8 = 7;
fn encode<W: Write>(&self, w: &mut W) -> std::io::Result<usize> {
match self {
Self::Alive(id) => encode!(w.id::ALIVE),
Self::NotAlive(id) => encode!(w.id::NOT_ALIVE),
Self::Clock(t) => encode!(w.t(CLOCK)),
Self::EventClock(t) => encode!(w.t(EVENT_CLOCK)),
Self::QueryClock(t) => encode!(w.t(QUERY_CLOCK)),
Self::Coordinate => encode!(w.COORDINATE),
Self::Leave => encode!(w.LEAVE),
Self::Comment => encode!(w.COMMENT),
}
}
}
#[viewit::viewit]
pub(crate) struct ReplayResult<I, A> {
alive_nodes: HashSet<Node<I, A>>,
last_clock: LamportTime,
last_event_clock: LamportTime,
last_query_clock: LamportTime,
offset: u64,
fh: File,
path: PathBuf,
}
pub(crate) fn open_and_replay_snapshot<
I: Id + Data,
A: CheapClone + Data + core::hash::Hash + Eq + Send + Sync + 'static,
P: AsRef<std::path::Path>,
>(
p: &P,
rejoin_after_leave: bool,
) -> Result<ReplayResult<I, A>, SnapshotError> {
#[cfg(unix)]
let fh = OpenOptions::new()
.create(true)
.append(true)
.read(true)
.mode(0o644)
.open(p)
.map_err(SnapshotError::Open)?;
#[cfg(not(unix))]
let fh = OpenOptions::new()
.create(true)
.append(true)
.read(true)
.write(true)
.open(p)
.map_err(SnapshotError::Open)?;
let offset = fh.metadata().map_err(SnapshotError::Stat)?.len();
let mut reader = BufReader::new(fh);
let mut buf = Vec::new();
let mut alive_nodes = HashSet::new();
let mut last_clock = LamportTime::ZERO;
let mut last_event_clock = LamportTime::ZERO;
let mut last_query_clock = LamportTime::ZERO;
loop {
let kind = match reader.read_u8() {
Ok(b) => SnapshotRecordType::try_from(b)?,
Err(e) => {
if e.kind() == std::io::ErrorKind::UnexpectedEof {
break;
}
return Err(SnapshotError::Replay(e));
}
};
match kind {
SnapshotRecordType::Alive => {
let len = reader
.read_u32::<LittleEndian>()
.map_err(SnapshotError::Replay)? as usize;
buf.resize(len, 0);
reader.read_exact(&mut buf).map_err(SnapshotError::Replay)?;
let (_, node) = <Node<I, A> as Data>::decode(&buf)
.map_err(|e| SnapshotError::Replay(invalid_data_io_error(e)))?;
alive_nodes.insert(node);
}
SnapshotRecordType::NotAlive => {
let len = reader
.read_u32::<LittleEndian>()
.map_err(SnapshotError::Replay)? as usize;
buf.resize(len, 0);
reader.read_exact(&mut buf).map_err(SnapshotError::Replay)?;
let (_, node) = <Node<I, A> as Data>::decode(&buf)
.map_err(|e| SnapshotError::Replay(invalid_data_io_error(e)))?;
alive_nodes.remove(&node);
}
SnapshotRecordType::Clock => {
let t = reader
.read_u64::<LittleEndian>()
.map_err(SnapshotError::Replay)?;
last_clock = LamportTime::new(t);
}
SnapshotRecordType::EventClock => {
let t = reader
.read_u64::<LittleEndian>()
.map_err(SnapshotError::Replay)?;
last_event_clock = LamportTime::new(t);
}
SnapshotRecordType::QueryClock => {
let t = reader
.read_u64::<LittleEndian>()
.map_err(SnapshotError::Replay)?;
last_query_clock = LamportTime::new(t);
}
SnapshotRecordType::Coordinate => continue,
SnapshotRecordType::Leave => {
if rejoin_after_leave {
tracing::info!("serf: ignoring previous leave in snapshot");
continue;
}
alive_nodes.clear();
last_clock = LamportTime::ZERO;
last_event_clock = LamportTime::ZERO;
last_query_clock = LamportTime::ZERO;
}
SnapshotRecordType::Comment => continue,
}
}
let mut f = reader.into_inner();
f.seek(std::io::SeekFrom::End(0))
.map(|_| ReplayResult {
alive_nodes,
last_clock,
last_event_clock,
last_query_clock,
offset,
fh: f,
path: p.as_ref().to_path_buf(),
})
.map_err(SnapshotError::SeekEnd)
}
pub(crate) struct SnapshotHandle {
wait_rx: Receiver<()>,
shutdown_rx: Receiver<()>,
leave_tx: Sender<()>,
}
impl SnapshotHandle {
pub(crate) async fn wait(&self) {
let _ = self.wait_rx.recv().await;
}
pub(crate) async fn leave(&self) {
futures::select! {
_ = self.leave_tx.send(()).fuse() => {},
_ = self.shutdown_rx.recv().fuse() => {},
}
}
}
pub(crate) struct Snapshot<T, D>
where
D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
T: Transport,
{
alive_nodes: HashSet<Node<T::Id, T::ResolvedAddress>>,
clock: LamportClock,
fh: Option<BufWriter<File>>,
last_flush: Epoch,
last_clock: LamportTime,
last_event_clock: LamportTime,
last_query_clock: LamportTime,
leave_rx: Receiver<()>,
leaving: bool,
min_compact_size: u64,
path: PathBuf,
offset: u64,
rejoin_after_leave: bool,
stream_rx: Receiver<CrateEvent<T, D>>,
shutdown_rx: Receiver<()>,
wait_tx: Sender<()>,
last_attempted_compaction: Epoch,
#[cfg(feature = "metrics")]
metric_labels: std::sync::Arc<memberlist_core::proto::MetricLabels>,
}
macro_rules! stream_flush_event {
($this:ident <- $event:ident) => {{
if $this.leaving {
break;
}
match &$event {
CrateEvent::Member(e) => $this.process_member_event(e),
CrateEvent::User(e) => $this.process_user_event(e),
CrateEvent::Query(e) => $this.process_query_event(e.ltime),
CrateEvent::InternalQuery { query, .. } => $this.process_query_event(query.ltime),
}
}};
}
macro_rules! tee_stream_flush_event {
($stream_tx:ident <- $event:ident -> $out_tx:ident) => {{
futures::select! {
_ = $stream_tx.send($event.clone()).fuse() => {}
default => {}
}
futures::select! {
_ = $out_tx.send($event).fuse() => {}
default => {}
}
}};
}
impl<D, T> Snapshot<T, D>
where
D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
T: Transport,
{
#[allow(clippy::type_complexity)]
pub(crate) fn from_replay_result(
replay_result: ReplayResult<T::Id, T::ResolvedAddress>,
min_compact_size: u64,
rejoin_after_leave: bool,
clock: LamportClock,
out_tx: Sender<CrateEvent<T, D>>,
shutdown_rx: Receiver<()>,
#[cfg(feature = "metrics")] metric_labels: std::sync::Arc<memberlist_core::proto::MetricLabels>,
) -> Result<
(
Sender<CrateEvent<T, D>>,
TinyVec<Node<T::Id, MaybeResolvedAddress<T::Address, T::ResolvedAddress>>>,
SnapshotHandle,
),
SnapshotError,
> {
let (in_tx, in_rx) = async_channel::bounded(EVENT_CH_SIZE);
let (stream_tx, stream_rx) = async_channel::bounded(EVENT_CH_SIZE);
let (leave_tx, leave_rx) = async_channel::bounded(1);
let (wait_tx, wait_rx) = async_channel::bounded(1);
let ReplayResult {
alive_nodes,
last_clock,
last_event_clock,
last_query_clock,
offset,
fh,
path,
} = replay_result;
let this = Self {
alive_nodes,
clock,
fh: Some(BufWriter::new(fh)),
last_flush: Epoch::now(),
last_clock,
last_event_clock,
last_query_clock,
leave_rx,
leaving: false,
min_compact_size,
path,
offset,
rejoin_after_leave,
stream_rx,
shutdown_rx: shutdown_rx.clone(),
wait_tx,
last_attempted_compaction: Epoch::now(),
#[cfg(feature = "metrics")]
metric_labels,
};
let mut alive_nodes = this
.alive_nodes
.iter()
.map(|n| {
let id = n.id().cheap_clone();
let addr = n.address().cheap_clone();
Node::new(id, MaybeResolvedAddress::resolved(addr))
})
.collect::<TinyVec<_>>();
alive_nodes.shuffle(&mut rand::rng());
let handle = <T::Runtime as RuntimeLite>::spawn(Self::tee_stream(
in_rx,
stream_tx,
out_tx,
shutdown_rx.clone(),
));
<T::Runtime as RuntimeLite>::spawn_detach(this.stream(handle));
Ok((
in_tx,
alive_nodes,
SnapshotHandle {
wait_rx,
shutdown_rx,
leave_tx,
},
))
}
async fn tee_stream(
in_rx: Receiver<CrateEvent<T, D>>,
stream_tx: Sender<CrateEvent<T, D>>,
out_tx: Sender<CrateEvent<T, D>>,
shutdown_rx: Receiver<()>,
) {
loop {
futures::select! {
ev = in_rx.recv().fuse() => {
if let Ok(ev) = ev {
tee_stream_flush_event!(stream_tx <- ev -> out_tx)
} else {
break;
}
}
_ = shutdown_rx.recv().fuse() => {
break;
}
}
}
loop {
futures::select! {
ev = in_rx.recv().fuse() => {
if let Ok(ev) = ev {
tee_stream_flush_event!(stream_tx <- ev -> out_tx)
} else {
break;
}
}
default => break,
}
}
tracing::debug!("serf: snapshotter tee stream exits");
}
fn handle_leave(&mut self) {
self.leaving = true;
if !self.rejoin_after_leave {
self.alive_nodes.clear();
}
self.try_append(SnapshotRecord::Leave);
if let Some(fh) = self.fh.as_mut() {
if let Err(e) = fh.flush() {
tracing::error!(target="serf", err=%SnapshotError::Flush(e), "failed to flush leave to snapshot");
}
if let Err(e) = fh.get_mut().sync_all() {
tracing::error!(target="serf", err=%SnapshotError::Sync(e), "failed to sync leave to snapshot");
}
}
}
async fn stream(
mut self,
tee_handle: <<T::Runtime as RuntimeLite>::Spawner as AsyncSpawner>::JoinHandle<()>,
) {
let mut clock_ticker = <T::Runtime as RuntimeLite>::interval(CLOCK_UPDATE_INTERVAL);
loop {
futures::select! {
signal = self.leave_rx.recv().fuse() => {
if signal.is_ok() {
self.handle_leave();
}
}
ev = self.stream_rx.recv().fuse() => {
if let Ok(ev) = ev {
stream_flush_event!(self <- ev)
} else {
break;
}
}
_ = futures::StreamExt::next(&mut clock_ticker).fuse() => {
self.update_clock();
}
_ = self.shutdown_rx.recv().fuse() => {
break;
}
}
}
if self.leave_rx.try_recv().is_ok() {
self.handle_leave();
}
let flush_timeout = <T::Runtime as RuntimeLite>::sleep(SHUTDOWN_FLUSH_TIMEOUT);
futures::pin_mut!(flush_timeout);
self.update_clock();
loop {
futures::select! {
ev = self.stream_rx.recv().fuse() => {
if let Ok(ev) = ev {
stream_flush_event!(self <- ev)
} else {
break;
}
}
_ = (&mut flush_timeout).fuse() => {
break;
}
default => {
break;
}
}
}
if let Some(fh) = self.fh.as_mut() {
if let Err(e) = fh.flush() {
tracing::error!(target="serf", err=%SnapshotError::Flush(e), "failed to flush leave to snapshot");
}
if let Err(e) = fh.get_mut().sync_all() {
tracing::error!(target="serf", err=%SnapshotError::Sync(e), "failed to sync leave to snapshot");
}
}
self.wait_tx.close();
if let Err(e) = tee_handle.await {
tracing::error!(target="serf", err=%e, "failed to wait for tee stream to exit");
}
tracing::debug!("serf: snapshotter stream exits");
}
fn process_user_event(&mut self, e: &UserEventMessage) {
let ltime = e.ltime();
if ltime <= self.last_event_clock {
return;
}
self.last_event_clock = ltime;
self.try_append(SnapshotRecord::EventClock(ltime));
}
fn process_query_event(&mut self, ltime: LamportTime) {
if ltime <= self.last_query_clock {
return;
}
self.last_query_clock = ltime;
self.try_append(SnapshotRecord::QueryClock(ltime));
}
fn process_member_event(&mut self, e: &MemberEvent<T::Id, T::ResolvedAddress>) {
match e.ty {
MemberEventType::Join => {
for m in e.members() {
let node = m.node();
self.alive_nodes.insert(node.cheap_clone());
self.try_append(SnapshotRecord::Alive(Cow::Borrowed(node)))
}
}
MemberEventType::Leave | MemberEventType::Failed => {
for m in e.members() {
let node = m.node();
self.alive_nodes.remove(node);
self.try_append(SnapshotRecord::NotAlive(Cow::Borrowed(node)));
}
}
_ => {}
}
self.update_clock();
}
fn update_clock(&mut self) {
let t: u64 = self.clock.time().into();
let last_seen = LamportTime::from(t.saturating_sub(1));
if last_seen > self.last_clock {
self.last_clock = last_seen;
self.try_append(SnapshotRecord::Clock(self.last_clock));
}
}
fn try_append(&mut self, l: SnapshotRecord<'_, T::Id, T::ResolvedAddress>) {
if let Err(e) = self.append_line(l) {
tracing::error!(err = %e, "serf: failed to update snapshot");
if self.last_attempted_compaction.elapsed() > SNAPSHOT_ERROR_RECOVERY_INTERVAL {
self.last_attempted_compaction = Epoch::now();
tracing::info!("serf: attempting compaction to recover from error...");
if let Err(e) = self.compact() {
tracing::error!(err = %e, "serf: compaction failed, will reattempt after {}s", SNAPSHOT_ERROR_RECOVERY_INTERVAL.as_secs());
} else {
tracing::info!("serf: finished compaction, successfully recovered from error state");
}
}
}
}
fn append_line(
&mut self,
l: SnapshotRecord<'_, T::Id, T::ResolvedAddress>,
) -> Result<(), SnapshotError> {
#[cfg(feature = "metrics")]
let start = crate::types::Epoch::now();
#[cfg(feature = "metrics")]
let metric_labels = self.metric_labels.clone();
#[cfg(feature = "metrics")]
scopeguard::defer!(
metrics::histogram!("serf.snapshot.append_line", metric_labels.iter())
.record(start.elapsed().as_millis() as f64)
);
let f = self.fh.as_mut().unwrap();
let n = l.encode(f).map_err(SnapshotError::Write)?;
if self.last_flush.elapsed() > FLUSH_INTERVAL {
self.last_flush = Epoch::now();
self
.fh
.as_mut()
.unwrap()
.flush()
.map_err(SnapshotError::Flush)?;
}
self.offset += n as u64;
if self.offset > self.snapshot_max_size() {
self.compact()?;
}
Ok(())
}
fn snapshot_max_size(&self) -> u64 {
let nodes = self.alive_nodes.len() as u64;
let est_size = nodes * SNAPSHOT_BYTES_PER_NODE as u64;
let threshold = est_size * SNAPSHOT_COMPACTION_THRESHOLD as u64;
threshold.max(self.min_compact_size)
}
fn compact(&mut self) -> Result<(), SnapshotError> {
#[cfg(feature = "metrics")]
let start = crate::types::Epoch::now();
#[cfg(feature = "metrics")]
let metric_labels = self.metric_labels.clone();
#[cfg(feature = "metrics")]
scopeguard::defer!(
metrics::histogram!("serf.snapshot.compact", metric_labels.iter())
.record(start.elapsed().as_millis() as f64)
);
let new_path = self.path.with_extension(TMP_EXT);
#[cfg(unix)]
let fh = OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.mode(0o755)
.open(&new_path)
.map_err(SnapshotError::OpenNew)?;
#[cfg(not(unix))]
let fh = OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(&new_path)
.map_err(SnapshotError::OpenNew)?;
let mut buf = BufWriter::new(fh);
let mut offset = 0u64;
for node in self.alive_nodes.iter() {
offset += SnapshotRecord::Alive(Cow::Borrowed(node))
.encode(&mut buf)
.map_err(SnapshotError::WriteNew)? as u64;
}
offset += SnapshotRecord::<T::Id, T::ResolvedAddress>::Clock(self.last_clock)
.encode(&mut buf)
.map_err(SnapshotError::WriteNew)? as u64;
offset += SnapshotRecord::<T::Id, T::ResolvedAddress>::EventClock(self.last_event_clock)
.encode(&mut buf)
.map_err(SnapshotError::WriteNew)? as u64;
offset += SnapshotRecord::<T::Id, T::ResolvedAddress>::QueryClock(self.last_query_clock)
.encode(&mut buf)
.map_err(SnapshotError::WriteNew)? as u64;
buf.flush().map_err(SnapshotError::Flush)?;
buf.get_ref().sync_all().map_err(SnapshotError::Sync)?;
drop(buf);
let mut old = self.fh.take().unwrap();
let _ = old.flush();
drop(old);
if let Err(e) = std::fs::remove_file(&self.path) {
if !matches!(e.kind(), std::io::ErrorKind::NotFound) {
return Err(SnapshotError::Remove(e));
}
}
std::fs::rename(&new_path, &self.path).map_err(SnapshotError::Install)?;
#[cfg(unix)]
let fh = OpenOptions::new()
.create(true)
.append(true)
.read(true)
.mode(0o755)
.open(&self.path)
.map_err(SnapshotError::Open)?;
#[cfg(not(unix))]
let fh = OpenOptions::new()
.create(true)
.append(true)
.read(true)
.write(true)
.open(&self.path)
.map_err(SnapshotError::Open)?;
self.fh = Some(BufWriter::new(fh));
self.offset = offset;
self.last_flush = Epoch::now();
Ok(())
}
}