serf_core/
snapshot.rs

1use std::{
2  borrow::Cow,
3  collections::HashSet,
4  fs::{File, OpenOptions},
5  io::{BufReader, BufWriter, Read, Seek, Write},
6  mem,
7  path::PathBuf,
8  time::Duration,
9};
10
11#[cfg(unix)]
12use std::os::unix::prelude::OpenOptionsExt;
13
14use async_channel::{Receiver, Sender};
15use byteorder::{LittleEndian, ReadBytesExt};
16use futures::FutureExt;
17use memberlist_core::{
18  agnostic_lite::{AsyncSpawner, RuntimeLite},
19  bytes::{BufMut, BytesMut},
20  tracing,
21  transport::{AddressResolver, Id, MaybeResolvedAddress, Node, Transport},
22  types::TinyVec,
23  CheapClone,
24};
25use rand::seq::SliceRandom;
26use serf_types::UserEventMessage;
27
28use crate::{
29  delegate::{Delegate, TransformDelegate},
30  event::{CrateEvent, MemberEvent, MemberEventType},
31  invalid_data_io_error,
32  types::{Epoch, LamportClock, LamportTime},
33};
34
35/// How often we force a flush of the snapshot file
36const FLUSH_INTERVAL: Duration = Duration::from_millis(500);
37
38/// How often we fetch the current lamport time of the cluster and write to the snapshot file
39const CLOCK_UPDATE_INTERVAL: Duration = Duration::from_millis(500);
40
41/// The extention we use for the temporary file during compaction
42const TMP_EXT: &str = "compact";
43
44/// How often we attempt to recover from
45/// errors writing to the snapshot file.
46const SNAPSHOT_ERROR_RECOVERY_INTERVAL: Duration = Duration::from_secs(30);
47
48/// The size of the event buffers between Serf and the
49/// consuming application. If this is exhausted we will block Serf and Memberlist.
50const EVENT_CH_SIZE: usize = 2048;
51
52/// The time limit to write pending events to the snapshot during a shutdown
53const SHUTDOWN_FLUSH_TIMEOUT: Duration = Duration::from_millis(250);
54
55/// An estimated bytes per node to snapshot
56const SNAPSHOT_BYTES_PER_NODE: usize = 128;
57
58/// The threshold we apply to
59/// the snapshot size estimate (nodes * bytes per node) before compacting.
60const SNAPSHOT_COMPACTION_THRESHOLD: usize = 2;
61
62/// Errors that can occur while interacting with snapshots
63#[derive(Debug, thiserror::Error)]
64pub enum SnapshotError {
65  /// Returned when opening a snapshot fails
66  #[error("failed to open snapshot: {0}")]
67  Open(std::io::Error),
68  /// Returned when opening a new snapshot fails
69  #[error("failed to open new snapshot: {0}")]
70  OpenNew(std::io::Error),
71  /// Returned when flush new snapshot fails
72  #[error("failed to flush new snapshot: {0}")]
73  FlushNew(std::io::Error),
74  /// Returned when flush snapshot fails
75  #[error("failed to flush snapshot: {0}")]
76  Flush(std::io::Error),
77  /// Returned when fsync snapshot fails
78  #[error("failed to fsync snapshot: {0}")]
79  Sync(std::io::Error),
80  /// Returned when stat snapshot fails
81  #[error("failed to stat snapshot: {0}")]
82  Stat(std::io::Error),
83  /// Returned when remove old snapshot fails
84  #[error("failed to remove old snapshot: {0}")]
85  Remove(std::io::Error),
86  /// Returned when installing a new snapshot fails
87  #[error("failed to install new snapshot: {0}")]
88  Install(std::io::Error),
89  /// Returned when writing to a new snapshot fails
90  #[error("failed to write to new snapshot: {0}")]
91  WriteNew(std::io::Error),
92  /// Returned when writing to a snapshot fails
93  #[error("failed to write to snapshot: {0}")]
94  Write(std::io::Error),
95  /// Returned when seek to start of a snapshot fails
96  #[error("failed to seek to beginning of snapshot: {0}")]
97  SeekStart(std::io::Error),
98  /// Returned when seek to end of a snapshot fails
99  #[error("failed to seek to end of snapshot: {0}")]
100  SeekEnd(std::io::Error),
101  /// Returned when replaying a snapshot fails
102  #[error("failed to replay snapshot: {0}")]
103  Replay(std::io::Error),
104  /// Returned when fail to decode snapshot record type.
105  #[error(transparent)]
106  UnknownRecordType(#[from] UnknownRecordType),
107}
108
109/// UnknownRecordType is used to indicate that we encountered an unknown
110/// record type while reading a snapshot file.
111#[derive(Debug, thiserror::Error)]
112#[error("unrecognized snapshot record type: {0}")]
113pub struct UnknownRecordType(u8);
114
115#[derive(Debug, Copy, Clone, PartialEq, Eq)]
116#[repr(u8)]
117enum SnapshotRecordType {
118  Alive = 0,
119  NotAlive = 1,
120  Clock = 2,
121  EventClock = 3,
122  QueryClock = 4,
123  Coordinate = 5,
124  Leave = 6,
125  Comment = 7,
126}
127
128impl TryFrom<u8> for SnapshotRecordType {
129  type Error = UnknownRecordType;
130
131  fn try_from(value: u8) -> Result<Self, Self::Error> {
132    match value {
133      0 => Ok(Self::Alive),
134      1 => Ok(Self::NotAlive),
135      2 => Ok(Self::Clock),
136      3 => Ok(Self::EventClock),
137      4 => Ok(Self::QueryClock),
138      5 => Ok(Self::Coordinate),
139      6 => Ok(Self::Leave),
140      7 => Ok(Self::Comment),
141      v => Err(UnknownRecordType(v)),
142    }
143  }
144}
145
146#[allow(dead_code)]
147enum SnapshotRecord<'a, I: Clone, A: Clone> {
148  Alive(Cow<'a, Node<I, A>>),
149  NotAlive(Cow<'a, Node<I, A>>),
150  Clock(LamportTime),
151  EventClock(LamportTime),
152  QueryClock(LamportTime),
153  Coordinate,
154  Leave,
155  Comment,
156}
157
158const MAX_INLINED_BYTES: usize = 64;
159
160macro_rules! encode {
161  ($w:ident.$node: ident::$status: ident) => {{
162    let node = $node.as_ref();
163    let encoded_node_len = T::node_encoded_len(node);
164    let encoded_len = 4 + 1 + encoded_node_len;
165    if encoded_len <= MAX_INLINED_BYTES {
166      let mut buf = [0u8; MAX_INLINED_BYTES];
167      buf[0] = Self::$status;
168      buf[1..5].copy_from_slice(&(encoded_node_len as u32).to_le_bytes());
169      T::encode_node(node, &mut buf[5..]).map_err(invalid_data_io_error)?;
170      $w.write_all(&buf[..encoded_len]).map(|_| encoded_len)
171    } else {
172      let mut buf = BytesMut::with_capacity(encoded_len);
173      buf.put_u8(Self::$status);
174      buf.put_u32_le(encoded_node_len as u32);
175      T::encode_node(node, &mut buf).map_err(invalid_data_io_error)?;
176      $w.write_all(&buf).map(|_| encoded_len)
177    }
178  }};
179  ($w:ident.$t: ident($status: ident)) => {{
180    const N: usize = mem::size_of::<u8>() + mem::size_of::<u64>();
181    let mut data = [0u8; N];
182    data[0] = Self::$status;
183    data[1..N].copy_from_slice(&$t.to_le_bytes());
184    $w.write_all(&data).map(|_| N)
185  }};
186  ($w:ident.$ident: ident) => {{
187    $w.write_all(&[Self::$ident]).map(|_| 1)
188  }};
189}
190
191impl<I, A> SnapshotRecord<'_, I, A>
192where
193  I: Id,
194  A: CheapClone + Send + Sync + 'static,
195{
196  const ALIVE: u8 = 0;
197  const NOT_ALIVE: u8 = 1;
198  const CLOCK: u8 = 2;
199  const EVENT_CLOCK: u8 = 3;
200  const QUERY_CLOCK: u8 = 4;
201  const COORDINATE: u8 = 5;
202  const LEAVE: u8 = 6;
203  const COMMENT: u8 = 7;
204
205  fn encode<T: TransformDelegate<Id = I, Address = A>, W: Write>(
206    &self,
207    w: &mut W,
208  ) -> std::io::Result<usize> {
209    match self {
210      Self::Alive(id) => encode!(w.id::ALIVE),
211      Self::NotAlive(id) => encode!(w.id::NOT_ALIVE),
212      Self::Clock(t) => encode!(w.t(CLOCK)),
213      Self::EventClock(t) => encode!(w.t(EVENT_CLOCK)),
214      Self::QueryClock(t) => encode!(w.t(QUERY_CLOCK)),
215      Self::Coordinate => encode!(w.COORDINATE),
216      Self::Leave => encode!(w.LEAVE),
217      Self::Comment => encode!(w.COMMENT),
218    }
219  }
220}
221
222#[viewit::viewit]
223pub(crate) struct ReplayResult<I, A> {
224  alive_nodes: HashSet<Node<I, A>>,
225  last_clock: LamportTime,
226  last_event_clock: LamportTime,
227  last_query_clock: LamportTime,
228  offset: u64,
229  fh: File,
230  path: PathBuf,
231}
232
233pub(crate) fn open_and_replay_snapshot<
234  I: Id,
235  A: CheapClone + core::hash::Hash + Eq + Send + Sync + 'static,
236  T: TransformDelegate<Id = I, Address = A>,
237  P: AsRef<std::path::Path>,
238>(
239  p: &P,
240  rejoin_after_leave: bool,
241) -> Result<ReplayResult<I, A>, SnapshotError> {
242  // Try to open the file
243  #[cfg(unix)]
244  let fh = OpenOptions::new()
245    .create(true)
246    .append(true)
247    .read(true)
248    .mode(0o644)
249    .open(p)
250    .map_err(SnapshotError::Open)?;
251  #[cfg(not(unix))]
252  let fh = OpenOptions::new()
253    .create(true)
254    .append(true)
255    .read(true)
256    .write(true)
257    .open(p)
258    .map_err(SnapshotError::Open)?;
259
260  // Determine the offset
261  let offset = fh.metadata().map_err(SnapshotError::Stat)?.len();
262
263  // Read each line
264  let mut reader = BufReader::new(fh);
265  let mut buf = Vec::new();
266  let mut alive_nodes = HashSet::new();
267  let mut last_clock = LamportTime::ZERO;
268  let mut last_event_clock = LamportTime::ZERO;
269  let mut last_query_clock = LamportTime::ZERO;
270
271  loop {
272    let kind = match reader.read_u8() {
273      Ok(b) => SnapshotRecordType::try_from(b)?,
274      Err(e) => {
275        if e.kind() == std::io::ErrorKind::UnexpectedEof {
276          break;
277        }
278        return Err(SnapshotError::Replay(e));
279      }
280    };
281
282    match kind {
283      SnapshotRecordType::Alive => {
284        let len = reader
285          .read_u32::<LittleEndian>()
286          .map_err(SnapshotError::Replay)? as usize;
287        buf.resize(len, 0);
288        reader.read_exact(&mut buf).map_err(SnapshotError::Replay)?;
289
290        let (_, node) =
291          T::decode_node(&buf).map_err(|e| SnapshotError::Replay(invalid_data_io_error(e)))?;
292        alive_nodes.insert(node);
293      }
294      SnapshotRecordType::NotAlive => {
295        let len = reader
296          .read_u32::<LittleEndian>()
297          .map_err(SnapshotError::Replay)? as usize;
298        buf.resize(len, 0);
299        reader.read_exact(&mut buf).map_err(SnapshotError::Replay)?;
300
301        let (_, node) =
302          T::decode_node(&buf).map_err(|e| SnapshotError::Replay(invalid_data_io_error(e)))?;
303        alive_nodes.remove(&node);
304      }
305      SnapshotRecordType::Clock => {
306        let t = reader
307          .read_u64::<LittleEndian>()
308          .map_err(SnapshotError::Replay)?;
309        last_clock = LamportTime::new(t);
310      }
311      SnapshotRecordType::EventClock => {
312        let t = reader
313          .read_u64::<LittleEndian>()
314          .map_err(SnapshotError::Replay)?;
315        last_event_clock = LamportTime::new(t);
316      }
317      SnapshotRecordType::QueryClock => {
318        let t = reader
319          .read_u64::<LittleEndian>()
320          .map_err(SnapshotError::Replay)?;
321        last_query_clock = LamportTime::new(t);
322      }
323      SnapshotRecordType::Coordinate => continue,
324      SnapshotRecordType::Leave => {
325        // Ignore a leave if we plan on re-joining
326        if rejoin_after_leave {
327          tracing::info!("serf: ignoring previous leave in snapshot");
328          continue;
329        }
330        alive_nodes.clear();
331        last_clock = LamportTime::ZERO;
332        last_event_clock = LamportTime::ZERO;
333        last_query_clock = LamportTime::ZERO;
334      }
335      SnapshotRecordType::Comment => continue,
336    }
337  }
338
339  // Seek to the end
340  let mut f = reader.into_inner();
341
342  f.seek(std::io::SeekFrom::End(0))
343    .map(|_| ReplayResult {
344      alive_nodes,
345      last_clock,
346      last_event_clock,
347      last_query_clock,
348      offset,
349      fh: f,
350      path: p.as_ref().to_path_buf(),
351    })
352    .map_err(SnapshotError::SeekEnd)
353}
354
355pub(crate) struct SnapshotHandle {
356  wait_rx: Receiver<()>,
357  shutdown_rx: Receiver<()>,
358  leave_tx: Sender<()>,
359}
360
361impl SnapshotHandle {
362  /// Used to wait until the snapshotter finishes shut down
363  pub(crate) async fn wait(&self) {
364    let _ = self.wait_rx.recv().await;
365  }
366
367  /// Used to remove known nodes to prevent a restart from
368  /// causing a join. Otherwise nodes will re-join after leaving!
369  pub(crate) async fn leave(&self) {
370    futures::select! {
371      _ = self.leave_tx.send(()).fuse() => {},
372      _ = self.shutdown_rx.recv().fuse() => {},
373    }
374  }
375}
376
377/// Responsible for ingesting events and persisting
378/// them to disk, and providing a recovery mechanism at start time.
379pub(crate) struct Snapshot<T, D>
380where
381  D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
382  T: Transport,
383{
384  alive_nodes: HashSet<Node<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>>,
385  clock: LamportClock,
386  fh: Option<BufWriter<File>>,
387  last_flush: Epoch,
388  last_clock: LamportTime,
389  last_event_clock: LamportTime,
390  last_query_clock: LamportTime,
391  leave_rx: Receiver<()>,
392  leaving: bool,
393  min_compact_size: u64,
394  path: PathBuf,
395  offset: u64,
396  rejoin_after_leave: bool,
397  stream_rx: Receiver<CrateEvent<T, D>>,
398  shutdown_rx: Receiver<()>,
399  wait_tx: Sender<()>,
400  last_attempted_compaction: Epoch,
401  #[cfg(feature = "metrics")]
402  metric_labels: std::sync::Arc<memberlist_core::types::MetricLabels>,
403}
404
405// flushEvent is used to handle writing out an event
406macro_rules! stream_flush_event {
407  ($this:ident <- $event:ident) => {{
408    // Stop recording events after a leave is issued
409    if $this.leaving {
410      break;
411    }
412
413    match &$event {
414      CrateEvent::Member(e) => $this.process_member_event(e),
415      CrateEvent::User(e) => $this.process_user_event(e),
416      CrateEvent::Query(e) => $this.process_query_event(e.ltime),
417      CrateEvent::InternalQuery { query, .. } => $this.process_query_event(query.ltime),
418    }
419  }};
420}
421
422macro_rules! tee_stream_flush_event {
423  ($stream_tx:ident <- $event:ident -> $out_tx:ident) => {{
424    // Forward to the internal stream, do not block
425    futures::select! {
426      _ = $stream_tx.send($event.clone()).fuse() => {}
427      default => {}
428    }
429
430    // Forward the event immediately, do not block
431    futures::select! {
432      _ = $out_tx.send($event).fuse() => {}
433      default => {}
434    }
435  }};
436}
437
438impl<D, T> Snapshot<T, D>
439where
440  D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
441  T: Transport,
442{
443  #[allow(clippy::type_complexity)]
444  pub(crate) fn from_replay_result(
445    replay_result: ReplayResult<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>,
446    min_compact_size: u64,
447    rejoin_after_leave: bool,
448    clock: LamportClock,
449    out_tx: Sender<CrateEvent<T, D>>,
450    shutdown_rx: Receiver<()>,
451    #[cfg(feature = "metrics")] metric_labels: std::sync::Arc<memberlist_core::types::MetricLabels>,
452  ) -> Result<
453    (
454      Sender<CrateEvent<T, D>>,
455      TinyVec<Node<T::Id, MaybeResolvedAddress<T>>>,
456      SnapshotHandle,
457    ),
458    SnapshotError,
459  > {
460    let (in_tx, in_rx) = async_channel::bounded(EVENT_CH_SIZE);
461    let (stream_tx, stream_rx) = async_channel::bounded(EVENT_CH_SIZE);
462    let (leave_tx, leave_rx) = async_channel::bounded(1);
463    let (wait_tx, wait_rx) = async_channel::bounded(1);
464
465    let ReplayResult {
466      alive_nodes,
467      last_clock,
468      last_event_clock,
469      last_query_clock,
470      offset,
471      fh,
472      path,
473    } = replay_result;
474
475    // Create the snapshotter
476    let this = Self {
477      alive_nodes,
478      clock,
479      fh: Some(BufWriter::new(fh)),
480      last_flush: Epoch::now(),
481      last_clock,
482      last_event_clock,
483      last_query_clock,
484      leave_rx,
485      leaving: false,
486      min_compact_size,
487      path,
488      offset,
489      rejoin_after_leave,
490      stream_rx,
491      shutdown_rx: shutdown_rx.clone(),
492      wait_tx,
493      last_attempted_compaction: Epoch::now(),
494      #[cfg(feature = "metrics")]
495      metric_labels,
496    };
497
498    let mut alive_nodes = this
499      .alive_nodes
500      .iter()
501      .map(|n| {
502        let id = n.id().cheap_clone();
503        let addr = n.address().cheap_clone();
504        Node::new(id, MaybeResolvedAddress::resolved(addr))
505      })
506      .collect::<TinyVec<_>>();
507    alive_nodes.shuffle(&mut rand::thread_rng());
508
509    // Start handling new commands
510    let handle = <T::Runtime as RuntimeLite>::spawn(Self::tee_stream(
511      in_rx,
512      stream_tx,
513      out_tx,
514      shutdown_rx.clone(),
515    ));
516    <T::Runtime as RuntimeLite>::spawn_detach(this.stream(handle));
517
518    Ok((
519      in_tx,
520      alive_nodes,
521      SnapshotHandle {
522        wait_rx,
523        shutdown_rx,
524        leave_tx,
525      },
526    ))
527  }
528
529  /// A long running routine that is used to copy events
530  /// to the output channel and the internal event handler.
531  async fn tee_stream(
532    in_rx: Receiver<CrateEvent<T, D>>,
533    stream_tx: Sender<CrateEvent<T, D>>,
534    out_tx: Sender<CrateEvent<T, D>>,
535    shutdown_rx: Receiver<()>,
536  ) {
537    loop {
538      futures::select! {
539        ev = in_rx.recv().fuse() => {
540          if let Ok(ev) = ev {
541            tee_stream_flush_event!(stream_tx <- ev -> out_tx)
542          } else {
543            break;
544          }
545        }
546        _ = shutdown_rx.recv().fuse() => {
547          break;
548        }
549      }
550    }
551
552    // Drain any remaining events before exiting
553    loop {
554      futures::select! {
555        ev = in_rx.recv().fuse() => {
556          if let Ok(ev) = ev {
557            tee_stream_flush_event!(stream_tx <- ev -> out_tx)
558          } else {
559            break;
560          }
561        }
562        default => break,
563      }
564    }
565    tracing::debug!("serf: snapshotter tee stream exits");
566  }
567
568  fn handle_leave(&mut self) {
569    self.leaving = true;
570
571    // If we plan to re-join, keep our state
572    if !self.rejoin_after_leave {
573      self.alive_nodes.clear();
574    }
575    self.try_append(SnapshotRecord::Leave);
576    if let Some(fh) = self.fh.as_mut() {
577      if let Err(e) = fh.flush() {
578        tracing::error!(target="serf", err=%SnapshotError::Flush(e), "failed to flush leave to snapshot");
579      }
580
581      if let Err(e) = fh.get_mut().sync_all() {
582        tracing::error!(target="serf", err=%SnapshotError::Sync(e), "failed to sync leave to snapshot");
583      }
584    }
585  }
586
587  /// Long running routine that is used to handle events
588  async fn stream(
589    mut self,
590    tee_handle: <<T::Runtime as RuntimeLite>::Spawner as AsyncSpawner>::JoinHandle<()>,
591  ) {
592    let mut clock_ticker = <T::Runtime as RuntimeLite>::interval(CLOCK_UPDATE_INTERVAL);
593
594    loop {
595      futures::select! {
596        signal = self.leave_rx.recv().fuse() => {
597          if signal.is_ok() {
598            self.handle_leave();
599          }
600        }
601        ev = self.stream_rx.recv().fuse() => {
602          if let Ok(ev) = ev {
603            stream_flush_event!(self <- ev)
604          } else {
605            break;
606          }
607        }
608        _ = futures::StreamExt::next(&mut clock_ticker).fuse() => {
609          self.update_clock();
610        }
611        _ = self.shutdown_rx.recv().fuse() => {
612          break;
613        }
614      }
615    }
616
617    if self.leave_rx.try_recv().is_ok() {
618      self.handle_leave();
619    }
620
621    // Setup a timeout
622    let flush_timeout = <T::Runtime as RuntimeLite>::sleep(SHUTDOWN_FLUSH_TIMEOUT);
623    futures::pin_mut!(flush_timeout);
624
625    // snapshot the clock
626    self.update_clock();
627
628    // Clear out the buffers
629    loop {
630      futures::select! {
631        ev = self.stream_rx.recv().fuse() => {
632          if let Ok(ev) = ev {
633            stream_flush_event!(self <- ev)
634          } else {
635            break;
636          }
637        }
638        _ = (&mut flush_timeout).fuse() => {
639          break;
640        }
641        default => {
642          break;
643        }
644      }
645    }
646
647    if let Some(fh) = self.fh.as_mut() {
648      if let Err(e) = fh.flush() {
649        tracing::error!(target="serf", err=%SnapshotError::Flush(e), "failed to flush leave to snapshot");
650      }
651
652      if let Err(e) = fh.get_mut().sync_all() {
653        tracing::error!(target="serf", err=%SnapshotError::Sync(e), "failed to sync leave to snapshot");
654      }
655    }
656
657    self.wait_tx.close();
658    tee_handle.await;
659    tracing::debug!("serf: snapshotter stream exits");
660  }
661
662  /// Used to handle a single user event
663  fn process_user_event(&mut self, e: &UserEventMessage) {
664    // Ignore old clocks
665    let ltime = e.ltime();
666    if ltime <= self.last_event_clock {
667      return;
668    }
669
670    self.last_event_clock = ltime;
671    self.try_append(SnapshotRecord::EventClock(ltime));
672  }
673
674  /// Used to handle a single query event
675  fn process_query_event(&mut self, ltime: LamportTime) {
676    // Ignore old clocks
677    if ltime <= self.last_query_clock {
678      return;
679    }
680
681    self.last_query_clock = ltime;
682    self.try_append(SnapshotRecord::QueryClock(ltime));
683  }
684
685  /// Used to handle a single member event
686  fn process_member_event(
687    &mut self,
688    e: &MemberEvent<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>,
689  ) {
690    match e.ty {
691      MemberEventType::Join => {
692        for m in e.members() {
693          let node = m.node();
694          self.alive_nodes.insert(node.cheap_clone());
695          self.try_append(SnapshotRecord::Alive(Cow::Borrowed(node)))
696        }
697      }
698      MemberEventType::Leave | MemberEventType::Failed => {
699        for m in e.members() {
700          let node = m.node();
701          self.alive_nodes.remove(node);
702          self.try_append(SnapshotRecord::NotAlive(Cow::Borrowed(node)));
703        }
704      }
705      _ => {}
706    }
707    self.update_clock();
708  }
709
710  /// Called periodically to check if we should udpate our
711  /// clock value. This is done after member events but should also be done
712  /// periodically due to race conditions with join and leave intents
713  fn update_clock(&mut self) {
714    let t: u64 = self.clock.time().into();
715    let last_seen = LamportTime::from(t.saturating_sub(1));
716    if last_seen > self.last_clock {
717      self.last_clock = last_seen;
718      self.try_append(SnapshotRecord::Clock(self.last_clock));
719    }
720  }
721
722  fn try_append(
723    &mut self,
724    l: SnapshotRecord<'_, T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>,
725  ) {
726    if let Err(e) = self.append_line(l) {
727      tracing::error!(err = %e, "serf: failed to update snapshot");
728      if self.last_attempted_compaction.elapsed() > SNAPSHOT_ERROR_RECOVERY_INTERVAL {
729        self.last_attempted_compaction = Epoch::now();
730        tracing::info!("serf: attempting compaction to recover from error...");
731        if let Err(e) = self.compact() {
732          tracing::error!(err = %e, "serf: compaction failed, will reattempt after {}s", SNAPSHOT_ERROR_RECOVERY_INTERVAL.as_secs());
733        } else {
734          tracing::info!("serf: finished compaction, successfully recovered from error state");
735        }
736      }
737    }
738  }
739
740  fn append_line(
741    &mut self,
742    l: SnapshotRecord<'_, T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>,
743  ) -> Result<(), SnapshotError> {
744    #[cfg(feature = "metrics")]
745    let start = crate::types::Epoch::now();
746
747    #[cfg(feature = "metrics")]
748    let metric_labels = self.metric_labels.clone();
749    #[cfg(feature = "metrics")]
750    scopeguard::defer!(
751      metrics::histogram!("serf.snapshot.append_line", metric_labels.iter())
752        .record(start.elapsed().as_millis() as f64)
753    );
754
755    let f = self.fh.as_mut().unwrap();
756    let n = l.encode::<D, _>(f).map_err(SnapshotError::Write)?;
757
758    // check if we should flush
759    if self.last_flush.elapsed() > FLUSH_INTERVAL {
760      self.last_flush = Epoch::now();
761      self
762        .fh
763        .as_mut()
764        .unwrap()
765        .flush()
766        .map_err(SnapshotError::Flush)?;
767    }
768
769    // Check if a compaction is necessary
770    self.offset += n as u64;
771    if self.offset > self.snapshot_max_size() {
772      self.compact()?;
773    }
774    Ok(())
775  }
776
777  /// Computes the maximum size and is used to force periodic compaction.
778  fn snapshot_max_size(&self) -> u64 {
779    let nodes = self.alive_nodes.len() as u64;
780    let est_size = nodes * SNAPSHOT_BYTES_PER_NODE as u64;
781    let threshold = est_size * SNAPSHOT_COMPACTION_THRESHOLD as u64;
782    threshold.max(self.min_compact_size)
783  }
784
785  /// Used to compact the snapshot once it is too large
786  fn compact(&mut self) -> Result<(), SnapshotError> {
787    #[cfg(feature = "metrics")]
788    let start = crate::types::Epoch::now();
789
790    #[cfg(feature = "metrics")]
791    let metric_labels = self.metric_labels.clone();
792    #[cfg(feature = "metrics")]
793    scopeguard::defer!(
794      metrics::histogram!("serf.snapshot.compact", metric_labels.iter())
795        .record(start.elapsed().as_millis() as f64)
796    );
797
798    // Try to open the file to new file
799    let new_path = self.path.with_extension(TMP_EXT);
800    #[cfg(unix)]
801    let fh = OpenOptions::new()
802      .create(true)
803      .write(true)
804      .truncate(true)
805      .mode(0o755)
806      .open(&new_path)
807      .map_err(SnapshotError::OpenNew)?;
808
809    #[cfg(not(unix))]
810    let fh = OpenOptions::new()
811      .create(true)
812      .write(true)
813      .truncate(true)
814      .open(&new_path)
815      .map_err(SnapshotError::OpenNew)?;
816
817    // Create a buffered writer
818    let mut buf = BufWriter::new(fh);
819
820    // Write out the live nodes
821    let mut offset = 0u64;
822    for node in self.alive_nodes.iter() {
823      offset += SnapshotRecord::Alive(Cow::Borrowed(node))
824        .encode::<D, _>(&mut buf)
825        .map_err(SnapshotError::WriteNew)? as u64;
826    }
827
828    // Write out the clocks
829    offset += SnapshotRecord::Clock(self.last_clock)
830      .encode::<D, _>(&mut buf)
831      .map_err(SnapshotError::WriteNew)? as u64;
832
833    offset += SnapshotRecord::EventClock(self.last_event_clock)
834      .encode::<D, _>(&mut buf)
835      .map_err(SnapshotError::WriteNew)? as u64;
836
837    offset += SnapshotRecord::QueryClock(self.last_query_clock)
838      .encode::<D, _>(&mut buf)
839      .map_err(SnapshotError::WriteNew)? as u64;
840
841    // Flush the new snapshot
842    buf.flush().map_err(SnapshotError::Flush)?;
843
844    // Sync the new snapshot
845    buf.get_ref().sync_all().map_err(SnapshotError::Sync)?;
846    drop(buf);
847
848    // We now need to swap the old snapshot file with the new snapshot.
849    // Turns out, Windows won't let us rename the files if we have
850    // open handles to them or if the destination already exists. This
851    // means we are forced to close the existing handles, delete the
852    // old file, move the new one in place, and then re-open the file
853    // handles.
854
855    // Flush the existing snapshot, ignoring errors since we will
856    // delete it momentarily.
857    let mut old = self.fh.take().unwrap();
858    let _ = old.flush();
859    drop(old);
860
861    // Delete the old file
862    if let Err(e) = std::fs::remove_file(&self.path) {
863      if !matches!(e.kind(), std::io::ErrorKind::NotFound) {
864        return Err(SnapshotError::Remove(e));
865      }
866    }
867
868    // Move the new file into place
869    std::fs::rename(&new_path, &self.path).map_err(SnapshotError::Install)?;
870
871    // Open the new snapshot
872    #[cfg(unix)]
873    let fh = OpenOptions::new()
874      .create(true)
875      .append(true)
876      .read(true)
877      .mode(0o755)
878      .open(&self.path)
879      .map_err(SnapshotError::Open)?;
880
881    #[cfg(not(unix))]
882    let fh = OpenOptions::new()
883      .create(true)
884      .append(true)
885      .read(true)
886      .write(true)
887      .open(&self.path)
888      .map_err(SnapshotError::Open)?;
889
890    self.fh = Some(BufWriter::new(fh));
891    self.offset = offset;
892    self.last_flush = Epoch::now();
893    Ok(())
894  }
895}