serf_core/
snapshot.rs

1use std::{
2  borrow::Cow,
3  collections::HashSet,
4  fs::{File, OpenOptions},
5  io::{BufReader, BufWriter, Read, Seek, Write},
6  mem,
7  path::PathBuf,
8  time::Duration,
9};
10
11#[cfg(unix)]
12use std::os::unix::prelude::OpenOptionsExt;
13
14use crate::types::UserEventMessage;
15use async_channel::{Receiver, Sender};
16use byteorder::{LittleEndian, ReadBytesExt};
17use futures::FutureExt;
18use memberlist_core::{
19  CheapClone,
20  agnostic_lite::{AsyncSpawner, RuntimeLite},
21  bytes::{BufMut, BytesMut},
22  proto::{Data, MaybeResolvedAddress, TinyVec},
23  tracing,
24  transport::{Id, Node, Transport},
25};
26use rand::seq::SliceRandom;
27
28use crate::{
29  delegate::Delegate,
30  event::{CrateEvent, MemberEvent, MemberEventType},
31  invalid_data_io_error,
32  types::{Epoch, LamportClock, LamportTime},
33};
34
35/// How often we force a flush of the snapshot file
36const FLUSH_INTERVAL: Duration = Duration::from_millis(500);
37
38/// How often we fetch the current lamport time of the cluster and write to the snapshot file
39const CLOCK_UPDATE_INTERVAL: Duration = Duration::from_millis(500);
40
41/// The extention we use for the temporary file during compaction
42const TMP_EXT: &str = "compact";
43
44/// How often we attempt to recover from
45/// errors writing to the snapshot file.
46const SNAPSHOT_ERROR_RECOVERY_INTERVAL: Duration = Duration::from_secs(30);
47
48/// The size of the event buffers between Serf and the
49/// consuming application. If this is exhausted we will block Serf and Memberlist.
50const EVENT_CH_SIZE: usize = 2048;
51
52/// The time limit to write pending events to the snapshot during a shutdown
53const SHUTDOWN_FLUSH_TIMEOUT: Duration = Duration::from_millis(250);
54
55/// An estimated bytes per node to snapshot
56const SNAPSHOT_BYTES_PER_NODE: usize = 128;
57
58/// The threshold we apply to
59/// the snapshot size estimate (nodes * bytes per node) before compacting.
60const SNAPSHOT_COMPACTION_THRESHOLD: usize = 2;
61
62/// Errors that can occur while interacting with snapshots
63#[derive(Debug, thiserror::Error)]
64pub enum SnapshotError {
65  /// Returned when opening a snapshot fails
66  #[error("failed to open snapshot: {0}")]
67  Open(std::io::Error),
68  /// Returned when opening a new snapshot fails
69  #[error("failed to open new snapshot: {0}")]
70  OpenNew(std::io::Error),
71  /// Returned when flush new snapshot fails
72  #[error("failed to flush new snapshot: {0}")]
73  FlushNew(std::io::Error),
74  /// Returned when flush snapshot fails
75  #[error("failed to flush snapshot: {0}")]
76  Flush(std::io::Error),
77  /// Returned when fsync snapshot fails
78  #[error("failed to fsync snapshot: {0}")]
79  Sync(std::io::Error),
80  /// Returned when stat snapshot fails
81  #[error("failed to stat snapshot: {0}")]
82  Stat(std::io::Error),
83  /// Returned when remove old snapshot fails
84  #[error("failed to remove old snapshot: {0}")]
85  Remove(std::io::Error),
86  /// Returned when installing a new snapshot fails
87  #[error("failed to install new snapshot: {0}")]
88  Install(std::io::Error),
89  /// Returned when writing to a new snapshot fails
90  #[error("failed to write to new snapshot: {0}")]
91  WriteNew(std::io::Error),
92  /// Returned when writing to a snapshot fails
93  #[error("failed to write to snapshot: {0}")]
94  Write(std::io::Error),
95  /// Returned when seek to start of a snapshot fails
96  #[error("failed to seek to beginning of snapshot: {0}")]
97  SeekStart(std::io::Error),
98  /// Returned when seek to end of a snapshot fails
99  #[error("failed to seek to end of snapshot: {0}")]
100  SeekEnd(std::io::Error),
101  /// Returned when replaying a snapshot fails
102  #[error("failed to replay snapshot: {0}")]
103  Replay(std::io::Error),
104  /// Returned when fail to decode snapshot record type.
105  #[error(transparent)]
106  UnknownRecordType(#[from] UnknownRecordType),
107}
108
109/// UnknownRecordType is used to indicate that we encountered an unknown
110/// record type while reading a snapshot file.
111#[derive(Debug, thiserror::Error)]
112#[error("unrecognized snapshot record type: {0}")]
113pub struct UnknownRecordType(u8);
114
115#[derive(Debug, Copy, Clone, PartialEq, Eq)]
116#[repr(u8)]
117enum SnapshotRecordType {
118  Alive = 0,
119  NotAlive = 1,
120  Clock = 2,
121  EventClock = 3,
122  QueryClock = 4,
123  Coordinate = 5,
124  Leave = 6,
125  Comment = 7,
126}
127
128impl TryFrom<u8> for SnapshotRecordType {
129  type Error = UnknownRecordType;
130
131  fn try_from(value: u8) -> Result<Self, Self::Error> {
132    match value {
133      0 => Ok(Self::Alive),
134      1 => Ok(Self::NotAlive),
135      2 => Ok(Self::Clock),
136      3 => Ok(Self::EventClock),
137      4 => Ok(Self::QueryClock),
138      5 => Ok(Self::Coordinate),
139      6 => Ok(Self::Leave),
140      7 => Ok(Self::Comment),
141      v => Err(UnknownRecordType(v)),
142    }
143  }
144}
145
146#[allow(dead_code)]
147enum SnapshotRecord<'a, I: Clone, A: Clone> {
148  Alive(Cow<'a, Node<I, A>>),
149  NotAlive(Cow<'a, Node<I, A>>),
150  Clock(LamportTime),
151  EventClock(LamportTime),
152  QueryClock(LamportTime),
153  Coordinate,
154  Leave,
155  Comment,
156}
157
158const MAX_INLINED_BYTES: usize = 64;
159
160macro_rules! encode {
161  ($w:ident.$node: ident::$status: ident) => {{
162    let node = $node.as_ref();
163    let encoded_node_len = node.encoded_len();
164    let encoded_len = 4 + 1 + encoded_node_len;
165    if encoded_len <= MAX_INLINED_BYTES {
166      let mut buf = [0u8; MAX_INLINED_BYTES];
167      buf[0] = Self::$status;
168      buf[1..5].copy_from_slice(&(encoded_node_len as u32).to_le_bytes());
169      node.encode(&mut buf[5..]).map_err(invalid_data_io_error)?;
170      $w.write_all(&buf[..encoded_len]).map(|_| encoded_len)
171    } else {
172      let mut buf = BytesMut::with_capacity(encoded_len);
173      buf.put_u8(Self::$status);
174      buf.put_u32_le(encoded_node_len as u32);
175      node.encode(&mut buf).map_err(invalid_data_io_error)?;
176      $w.write_all(&buf).map(|_| encoded_len)
177    }
178  }};
179  ($w:ident.$t: ident($status: ident)) => {{
180    const N: usize = mem::size_of::<u8>() + mem::size_of::<u64>();
181    let mut data = [0u8; N];
182    data[0] = Self::$status;
183    data[1..N].copy_from_slice(&$t.to_le_bytes());
184    $w.write_all(&data).map(|_| N)
185  }};
186  ($w:ident.$ident: ident) => {{ $w.write_all(&[Self::$ident]).map(|_| 1) }};
187}
188
189impl<I, A> SnapshotRecord<'_, I, A>
190where
191  I: Id + Data,
192  A: CheapClone + Data + Send + Sync + 'static,
193{
194  const ALIVE: u8 = 0;
195  const NOT_ALIVE: u8 = 1;
196  const CLOCK: u8 = 2;
197  const EVENT_CLOCK: u8 = 3;
198  const QUERY_CLOCK: u8 = 4;
199  const COORDINATE: u8 = 5;
200  const LEAVE: u8 = 6;
201  const COMMENT: u8 = 7;
202
203  fn encode<W: Write>(&self, w: &mut W) -> std::io::Result<usize> {
204    match self {
205      Self::Alive(id) => encode!(w.id::ALIVE),
206      Self::NotAlive(id) => encode!(w.id::NOT_ALIVE),
207      Self::Clock(t) => encode!(w.t(CLOCK)),
208      Self::EventClock(t) => encode!(w.t(EVENT_CLOCK)),
209      Self::QueryClock(t) => encode!(w.t(QUERY_CLOCK)),
210      Self::Coordinate => encode!(w.COORDINATE),
211      Self::Leave => encode!(w.LEAVE),
212      Self::Comment => encode!(w.COMMENT),
213    }
214  }
215}
216
217#[viewit::viewit]
218pub(crate) struct ReplayResult<I, A> {
219  alive_nodes: HashSet<Node<I, A>>,
220  last_clock: LamportTime,
221  last_event_clock: LamportTime,
222  last_query_clock: LamportTime,
223  offset: u64,
224  fh: File,
225  path: PathBuf,
226}
227
228pub(crate) fn open_and_replay_snapshot<
229  I: Id + Data,
230  A: CheapClone + Data + core::hash::Hash + Eq + Send + Sync + 'static,
231  P: AsRef<std::path::Path>,
232>(
233  p: &P,
234  rejoin_after_leave: bool,
235) -> Result<ReplayResult<I, A>, SnapshotError> {
236  // Try to open the file
237  #[cfg(unix)]
238  let fh = OpenOptions::new()
239    .create(true)
240    .append(true)
241    .read(true)
242    .mode(0o644)
243    .open(p)
244    .map_err(SnapshotError::Open)?;
245  #[cfg(not(unix))]
246  let fh = OpenOptions::new()
247    .create(true)
248    .append(true)
249    .read(true)
250    .write(true)
251    .open(p)
252    .map_err(SnapshotError::Open)?;
253
254  // Determine the offset
255  let offset = fh.metadata().map_err(SnapshotError::Stat)?.len();
256
257  // Read each line
258  let mut reader = BufReader::new(fh);
259  let mut buf = Vec::new();
260  let mut alive_nodes = HashSet::new();
261  let mut last_clock = LamportTime::ZERO;
262  let mut last_event_clock = LamportTime::ZERO;
263  let mut last_query_clock = LamportTime::ZERO;
264
265  loop {
266    let kind = match reader.read_u8() {
267      Ok(b) => SnapshotRecordType::try_from(b)?,
268      Err(e) => {
269        if e.kind() == std::io::ErrorKind::UnexpectedEof {
270          break;
271        }
272        return Err(SnapshotError::Replay(e));
273      }
274    };
275
276    match kind {
277      SnapshotRecordType::Alive => {
278        let len = reader
279          .read_u32::<LittleEndian>()
280          .map_err(SnapshotError::Replay)? as usize;
281        buf.resize(len, 0);
282        reader.read_exact(&mut buf).map_err(SnapshotError::Replay)?;
283
284        let (_, node) = <Node<I, A> as Data>::decode(&buf)
285          .map_err(|e| SnapshotError::Replay(invalid_data_io_error(e)))?;
286        alive_nodes.insert(node);
287      }
288      SnapshotRecordType::NotAlive => {
289        let len = reader
290          .read_u32::<LittleEndian>()
291          .map_err(SnapshotError::Replay)? as usize;
292        buf.resize(len, 0);
293        reader.read_exact(&mut buf).map_err(SnapshotError::Replay)?;
294
295        let (_, node) = <Node<I, A> as Data>::decode(&buf)
296          .map_err(|e| SnapshotError::Replay(invalid_data_io_error(e)))?;
297        alive_nodes.remove(&node);
298      }
299      SnapshotRecordType::Clock => {
300        let t = reader
301          .read_u64::<LittleEndian>()
302          .map_err(SnapshotError::Replay)?;
303        last_clock = LamportTime::new(t);
304      }
305      SnapshotRecordType::EventClock => {
306        let t = reader
307          .read_u64::<LittleEndian>()
308          .map_err(SnapshotError::Replay)?;
309        last_event_clock = LamportTime::new(t);
310      }
311      SnapshotRecordType::QueryClock => {
312        let t = reader
313          .read_u64::<LittleEndian>()
314          .map_err(SnapshotError::Replay)?;
315        last_query_clock = LamportTime::new(t);
316      }
317      SnapshotRecordType::Coordinate => continue,
318      SnapshotRecordType::Leave => {
319        // Ignore a leave if we plan on re-joining
320        if rejoin_after_leave {
321          tracing::info!("serf: ignoring previous leave in snapshot");
322          continue;
323        }
324        alive_nodes.clear();
325        last_clock = LamportTime::ZERO;
326        last_event_clock = LamportTime::ZERO;
327        last_query_clock = LamportTime::ZERO;
328      }
329      SnapshotRecordType::Comment => continue,
330    }
331  }
332
333  // Seek to the end
334  let mut f = reader.into_inner();
335
336  f.seek(std::io::SeekFrom::End(0))
337    .map(|_| ReplayResult {
338      alive_nodes,
339      last_clock,
340      last_event_clock,
341      last_query_clock,
342      offset,
343      fh: f,
344      path: p.as_ref().to_path_buf(),
345    })
346    .map_err(SnapshotError::SeekEnd)
347}
348
349pub(crate) struct SnapshotHandle {
350  wait_rx: Receiver<()>,
351  shutdown_rx: Receiver<()>,
352  leave_tx: Sender<()>,
353}
354
355impl SnapshotHandle {
356  /// Used to wait until the snapshotter finishes shut down
357  pub(crate) async fn wait(&self) {
358    let _ = self.wait_rx.recv().await;
359  }
360
361  /// Used to remove known nodes to prevent a restart from
362  /// causing a join. Otherwise nodes will re-join after leaving!
363  pub(crate) async fn leave(&self) {
364    futures::select! {
365      _ = self.leave_tx.send(()).fuse() => {},
366      _ = self.shutdown_rx.recv().fuse() => {},
367    }
368  }
369}
370
371/// Responsible for ingesting events and persisting
372/// them to disk, and providing a recovery mechanism at start time.
373pub(crate) struct Snapshot<T, D>
374where
375  D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
376  T: Transport,
377{
378  alive_nodes: HashSet<Node<T::Id, T::ResolvedAddress>>,
379  clock: LamportClock,
380  fh: Option<BufWriter<File>>,
381  last_flush: Epoch,
382  last_clock: LamportTime,
383  last_event_clock: LamportTime,
384  last_query_clock: LamportTime,
385  leave_rx: Receiver<()>,
386  leaving: bool,
387  min_compact_size: u64,
388  path: PathBuf,
389  offset: u64,
390  rejoin_after_leave: bool,
391  stream_rx: Receiver<CrateEvent<T, D>>,
392  shutdown_rx: Receiver<()>,
393  wait_tx: Sender<()>,
394  last_attempted_compaction: Epoch,
395  #[cfg(feature = "metrics")]
396  metric_labels: std::sync::Arc<memberlist_core::proto::MetricLabels>,
397}
398
399// flushEvent is used to handle writing out an event
400macro_rules! stream_flush_event {
401  ($this:ident <- $event:ident) => {{
402    // Stop recording events after a leave is issued
403    if $this.leaving {
404      break;
405    }
406
407    match &$event {
408      CrateEvent::Member(e) => $this.process_member_event(e),
409      CrateEvent::User(e) => $this.process_user_event(e),
410      CrateEvent::Query(e) => $this.process_query_event(e.ltime),
411      CrateEvent::InternalQuery { query, .. } => $this.process_query_event(query.ltime),
412    }
413  }};
414}
415
416macro_rules! tee_stream_flush_event {
417  ($stream_tx:ident <- $event:ident -> $out_tx:ident) => {{
418    // Forward to the internal stream, do not block
419    futures::select! {
420      _ = $stream_tx.send($event.clone()).fuse() => {}
421      default => {}
422    }
423
424    // Forward the event immediately, do not block
425    futures::select! {
426      _ = $out_tx.send($event).fuse() => {}
427      default => {}
428    }
429  }};
430}
431
432impl<D, T> Snapshot<T, D>
433where
434  D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
435  T: Transport,
436{
437  #[allow(clippy::type_complexity)]
438  pub(crate) fn from_replay_result(
439    replay_result: ReplayResult<T::Id, T::ResolvedAddress>,
440    min_compact_size: u64,
441    rejoin_after_leave: bool,
442    clock: LamportClock,
443    out_tx: Sender<CrateEvent<T, D>>,
444    shutdown_rx: Receiver<()>,
445    #[cfg(feature = "metrics")] metric_labels: std::sync::Arc<memberlist_core::proto::MetricLabels>,
446  ) -> Result<
447    (
448      Sender<CrateEvent<T, D>>,
449      TinyVec<Node<T::Id, MaybeResolvedAddress<T::Address, T::ResolvedAddress>>>,
450      SnapshotHandle,
451    ),
452    SnapshotError,
453  > {
454    let (in_tx, in_rx) = async_channel::bounded(EVENT_CH_SIZE);
455    let (stream_tx, stream_rx) = async_channel::bounded(EVENT_CH_SIZE);
456    let (leave_tx, leave_rx) = async_channel::bounded(1);
457    let (wait_tx, wait_rx) = async_channel::bounded(1);
458
459    let ReplayResult {
460      alive_nodes,
461      last_clock,
462      last_event_clock,
463      last_query_clock,
464      offset,
465      fh,
466      path,
467    } = replay_result;
468
469    // Create the snapshotter
470    let this = Self {
471      alive_nodes,
472      clock,
473      fh: Some(BufWriter::new(fh)),
474      last_flush: Epoch::now(),
475      last_clock,
476      last_event_clock,
477      last_query_clock,
478      leave_rx,
479      leaving: false,
480      min_compact_size,
481      path,
482      offset,
483      rejoin_after_leave,
484      stream_rx,
485      shutdown_rx: shutdown_rx.clone(),
486      wait_tx,
487      last_attempted_compaction: Epoch::now(),
488      #[cfg(feature = "metrics")]
489      metric_labels,
490    };
491
492    let mut alive_nodes = this
493      .alive_nodes
494      .iter()
495      .map(|n| {
496        let id = n.id().cheap_clone();
497        let addr = n.address().cheap_clone();
498        Node::new(id, MaybeResolvedAddress::resolved(addr))
499      })
500      .collect::<TinyVec<_>>();
501    alive_nodes.shuffle(&mut rand::rng());
502
503    // Start handling new commands
504    let handle = <T::Runtime as RuntimeLite>::spawn(Self::tee_stream(
505      in_rx,
506      stream_tx,
507      out_tx,
508      shutdown_rx.clone(),
509    ));
510    <T::Runtime as RuntimeLite>::spawn_detach(this.stream(handle));
511
512    Ok((
513      in_tx,
514      alive_nodes,
515      SnapshotHandle {
516        wait_rx,
517        shutdown_rx,
518        leave_tx,
519      },
520    ))
521  }
522
523  /// A long running routine that is used to copy events
524  /// to the output channel and the internal event handler.
525  async fn tee_stream(
526    in_rx: Receiver<CrateEvent<T, D>>,
527    stream_tx: Sender<CrateEvent<T, D>>,
528    out_tx: Sender<CrateEvent<T, D>>,
529    shutdown_rx: Receiver<()>,
530  ) {
531    loop {
532      futures::select! {
533        ev = in_rx.recv().fuse() => {
534          if let Ok(ev) = ev {
535            tee_stream_flush_event!(stream_tx <- ev -> out_tx)
536          } else {
537            break;
538          }
539        }
540        _ = shutdown_rx.recv().fuse() => {
541          break;
542        }
543      }
544    }
545
546    // Drain any remaining events before exiting
547    loop {
548      futures::select! {
549        ev = in_rx.recv().fuse() => {
550          if let Ok(ev) = ev {
551            tee_stream_flush_event!(stream_tx <- ev -> out_tx)
552          } else {
553            break;
554          }
555        }
556        default => break,
557      }
558    }
559    tracing::debug!("serf: snapshotter tee stream exits");
560  }
561
562  fn handle_leave(&mut self) {
563    self.leaving = true;
564
565    // If we plan to re-join, keep our state
566    if !self.rejoin_after_leave {
567      self.alive_nodes.clear();
568    }
569    self.try_append(SnapshotRecord::Leave);
570    if let Some(fh) = self.fh.as_mut() {
571      if let Err(e) = fh.flush() {
572        tracing::error!(target="serf", err=%SnapshotError::Flush(e), "failed to flush leave to snapshot");
573      }
574
575      if let Err(e) = fh.get_mut().sync_all() {
576        tracing::error!(target="serf", err=%SnapshotError::Sync(e), "failed to sync leave to snapshot");
577      }
578    }
579  }
580
581  /// Long running routine that is used to handle events
582  async fn stream(
583    mut self,
584    tee_handle: <<T::Runtime as RuntimeLite>::Spawner as AsyncSpawner>::JoinHandle<()>,
585  ) {
586    let mut clock_ticker = <T::Runtime as RuntimeLite>::interval(CLOCK_UPDATE_INTERVAL);
587
588    loop {
589      futures::select! {
590        signal = self.leave_rx.recv().fuse() => {
591          if signal.is_ok() {
592            self.handle_leave();
593          }
594        }
595        ev = self.stream_rx.recv().fuse() => {
596          if let Ok(ev) = ev {
597            stream_flush_event!(self <- ev)
598          } else {
599            break;
600          }
601        }
602        _ = futures::StreamExt::next(&mut clock_ticker).fuse() => {
603          self.update_clock();
604        }
605        _ = self.shutdown_rx.recv().fuse() => {
606          break;
607        }
608      }
609    }
610
611    if self.leave_rx.try_recv().is_ok() {
612      self.handle_leave();
613    }
614
615    // Setup a timeout
616    let flush_timeout = <T::Runtime as RuntimeLite>::sleep(SHUTDOWN_FLUSH_TIMEOUT);
617    futures::pin_mut!(flush_timeout);
618
619    // snapshot the clock
620    self.update_clock();
621
622    // Clear out the buffers
623    loop {
624      futures::select! {
625        ev = self.stream_rx.recv().fuse() => {
626          if let Ok(ev) = ev {
627            stream_flush_event!(self <- ev)
628          } else {
629            break;
630          }
631        }
632        _ = (&mut flush_timeout).fuse() => {
633          break;
634        }
635        default => {
636          break;
637        }
638      }
639    }
640
641    if let Some(fh) = self.fh.as_mut() {
642      if let Err(e) = fh.flush() {
643        tracing::error!(target="serf", err=%SnapshotError::Flush(e), "failed to flush leave to snapshot");
644      }
645
646      if let Err(e) = fh.get_mut().sync_all() {
647        tracing::error!(target="serf", err=%SnapshotError::Sync(e), "failed to sync leave to snapshot");
648      }
649    }
650
651    self.wait_tx.close();
652    if let Err(e) = tee_handle.await {
653      tracing::error!(target="serf", err=%e, "failed to wait for tee stream to exit");
654    }
655    tracing::debug!("serf: snapshotter stream exits");
656  }
657
658  /// Used to handle a single user event
659  fn process_user_event(&mut self, e: &UserEventMessage) {
660    // Ignore old clocks
661    let ltime = e.ltime();
662    if ltime <= self.last_event_clock {
663      return;
664    }
665
666    self.last_event_clock = ltime;
667    self.try_append(SnapshotRecord::EventClock(ltime));
668  }
669
670  /// Used to handle a single query event
671  fn process_query_event(&mut self, ltime: LamportTime) {
672    // Ignore old clocks
673    if ltime <= self.last_query_clock {
674      return;
675    }
676
677    self.last_query_clock = ltime;
678    self.try_append(SnapshotRecord::QueryClock(ltime));
679  }
680
681  /// Used to handle a single member event
682  fn process_member_event(&mut self, e: &MemberEvent<T::Id, T::ResolvedAddress>) {
683    match e.ty {
684      MemberEventType::Join => {
685        for m in e.members() {
686          let node = m.node();
687          self.alive_nodes.insert(node.cheap_clone());
688          self.try_append(SnapshotRecord::Alive(Cow::Borrowed(node)))
689        }
690      }
691      MemberEventType::Leave | MemberEventType::Failed => {
692        for m in e.members() {
693          let node = m.node();
694          self.alive_nodes.remove(node);
695          self.try_append(SnapshotRecord::NotAlive(Cow::Borrowed(node)));
696        }
697      }
698      _ => {}
699    }
700    self.update_clock();
701  }
702
703  /// Called periodically to check if we should udpate our
704  /// clock value. This is done after member events but should also be done
705  /// periodically due to race conditions with join and leave intents
706  fn update_clock(&mut self) {
707    let t: u64 = self.clock.time().into();
708    let last_seen = LamportTime::from(t.saturating_sub(1));
709    if last_seen > self.last_clock {
710      self.last_clock = last_seen;
711      self.try_append(SnapshotRecord::Clock(self.last_clock));
712    }
713  }
714
715  fn try_append(&mut self, l: SnapshotRecord<'_, T::Id, T::ResolvedAddress>) {
716    if let Err(e) = self.append_line(l) {
717      tracing::error!(err = %e, "serf: failed to update snapshot");
718      if self.last_attempted_compaction.elapsed() > SNAPSHOT_ERROR_RECOVERY_INTERVAL {
719        self.last_attempted_compaction = Epoch::now();
720        tracing::info!("serf: attempting compaction to recover from error...");
721        if let Err(e) = self.compact() {
722          tracing::error!(err = %e, "serf: compaction failed, will reattempt after {}s", SNAPSHOT_ERROR_RECOVERY_INTERVAL.as_secs());
723        } else {
724          tracing::info!("serf: finished compaction, successfully recovered from error state");
725        }
726      }
727    }
728  }
729
730  fn append_line(
731    &mut self,
732    l: SnapshotRecord<'_, T::Id, T::ResolvedAddress>,
733  ) -> Result<(), SnapshotError> {
734    #[cfg(feature = "metrics")]
735    let start = crate::types::Epoch::now();
736
737    #[cfg(feature = "metrics")]
738    let metric_labels = self.metric_labels.clone();
739    #[cfg(feature = "metrics")]
740    scopeguard::defer!(
741      metrics::histogram!("serf.snapshot.append_line", metric_labels.iter())
742        .record(start.elapsed().as_millis() as f64)
743    );
744
745    let f = self.fh.as_mut().unwrap();
746    let n = l.encode(f).map_err(SnapshotError::Write)?;
747
748    // check if we should flush
749    if self.last_flush.elapsed() > FLUSH_INTERVAL {
750      self.last_flush = Epoch::now();
751      self
752        .fh
753        .as_mut()
754        .unwrap()
755        .flush()
756        .map_err(SnapshotError::Flush)?;
757    }
758
759    // Check if a compaction is necessary
760    self.offset += n as u64;
761    if self.offset > self.snapshot_max_size() {
762      self.compact()?;
763    }
764    Ok(())
765  }
766
767  /// Computes the maximum size and is used to force periodic compaction.
768  fn snapshot_max_size(&self) -> u64 {
769    let nodes = self.alive_nodes.len() as u64;
770    let est_size = nodes * SNAPSHOT_BYTES_PER_NODE as u64;
771    let threshold = est_size * SNAPSHOT_COMPACTION_THRESHOLD as u64;
772    threshold.max(self.min_compact_size)
773  }
774
775  /// Used to compact the snapshot once it is too large
776  fn compact(&mut self) -> Result<(), SnapshotError> {
777    #[cfg(feature = "metrics")]
778    let start = crate::types::Epoch::now();
779
780    #[cfg(feature = "metrics")]
781    let metric_labels = self.metric_labels.clone();
782    #[cfg(feature = "metrics")]
783    scopeguard::defer!(
784      metrics::histogram!("serf.snapshot.compact", metric_labels.iter())
785        .record(start.elapsed().as_millis() as f64)
786    );
787
788    // Try to open the file to new file
789    let new_path = self.path.with_extension(TMP_EXT);
790    #[cfg(unix)]
791    let fh = OpenOptions::new()
792      .create(true)
793      .write(true)
794      .truncate(true)
795      .mode(0o755)
796      .open(&new_path)
797      .map_err(SnapshotError::OpenNew)?;
798
799    #[cfg(not(unix))]
800    let fh = OpenOptions::new()
801      .create(true)
802      .write(true)
803      .truncate(true)
804      .open(&new_path)
805      .map_err(SnapshotError::OpenNew)?;
806
807    // Create a buffered writer
808    let mut buf = BufWriter::new(fh);
809
810    // Write out the live nodes
811    let mut offset = 0u64;
812    for node in self.alive_nodes.iter() {
813      offset += SnapshotRecord::Alive(Cow::Borrowed(node))
814        .encode(&mut buf)
815        .map_err(SnapshotError::WriteNew)? as u64;
816    }
817
818    // Write out the clocks
819    offset += SnapshotRecord::<T::Id, T::ResolvedAddress>::Clock(self.last_clock)
820      .encode(&mut buf)
821      .map_err(SnapshotError::WriteNew)? as u64;
822
823    offset += SnapshotRecord::<T::Id, T::ResolvedAddress>::EventClock(self.last_event_clock)
824      .encode(&mut buf)
825      .map_err(SnapshotError::WriteNew)? as u64;
826
827    offset += SnapshotRecord::<T::Id, T::ResolvedAddress>::QueryClock(self.last_query_clock)
828      .encode(&mut buf)
829      .map_err(SnapshotError::WriteNew)? as u64;
830
831    // Flush the new snapshot
832    buf.flush().map_err(SnapshotError::Flush)?;
833
834    // Sync the new snapshot
835    buf.get_ref().sync_all().map_err(SnapshotError::Sync)?;
836    drop(buf);
837
838    // We now need to swap the old snapshot file with the new snapshot.
839    // Turns out, Windows won't let us rename the files if we have
840    // open handles to them or if the destination already exists. This
841    // means we are forced to close the existing handles, delete the
842    // old file, move the new one in place, and then re-open the file
843    // handles.
844
845    // Flush the existing snapshot, ignoring errors since we will
846    // delete it momentarily.
847    let mut old = self.fh.take().unwrap();
848    let _ = old.flush();
849    drop(old);
850
851    // Delete the old file
852    if let Err(e) = std::fs::remove_file(&self.path) {
853      if !matches!(e.kind(), std::io::ErrorKind::NotFound) {
854        return Err(SnapshotError::Remove(e));
855      }
856    }
857
858    // Move the new file into place
859    std::fs::rename(&new_path, &self.path).map_err(SnapshotError::Install)?;
860
861    // Open the new snapshot
862    #[cfg(unix)]
863    let fh = OpenOptions::new()
864      .create(true)
865      .append(true)
866      .read(true)
867      .mode(0o755)
868      .open(&self.path)
869      .map_err(SnapshotError::Open)?;
870
871    #[cfg(not(unix))]
872    let fh = OpenOptions::new()
873      .create(true)
874      .append(true)
875      .read(true)
876      .write(true)
877      .open(&self.path)
878      .map_err(SnapshotError::Open)?;
879
880    self.fh = Some(BufWriter::new(fh));
881    self.offset = offset;
882    self.last_flush = Epoch::now();
883    Ok(())
884  }
885}