1use std::{
2 borrow::Cow,
3 collections::HashSet,
4 fs::{File, OpenOptions},
5 io::{BufReader, BufWriter, Read, Seek, Write},
6 mem,
7 path::PathBuf,
8 time::Duration,
9};
10
11#[cfg(unix)]
12use std::os::unix::prelude::OpenOptionsExt;
13
14use async_channel::{Receiver, Sender};
15use byteorder::{LittleEndian, ReadBytesExt};
16use futures::FutureExt;
17use memberlist_core::{
18 agnostic_lite::{AsyncSpawner, RuntimeLite},
19 bytes::{BufMut, BytesMut},
20 tracing,
21 transport::{AddressResolver, Id, MaybeResolvedAddress, Node, Transport},
22 types::TinyVec,
23 CheapClone,
24};
25use rand::seq::SliceRandom;
26use serf_types::UserEventMessage;
27
28use crate::{
29 delegate::{Delegate, TransformDelegate},
30 event::{CrateEvent, MemberEvent, MemberEventType},
31 invalid_data_io_error,
32 types::{Epoch, LamportClock, LamportTime},
33};
34
35const FLUSH_INTERVAL: Duration = Duration::from_millis(500);
37
38const CLOCK_UPDATE_INTERVAL: Duration = Duration::from_millis(500);
40
41const TMP_EXT: &str = "compact";
43
44const SNAPSHOT_ERROR_RECOVERY_INTERVAL: Duration = Duration::from_secs(30);
47
48const EVENT_CH_SIZE: usize = 2048;
51
52const SHUTDOWN_FLUSH_TIMEOUT: Duration = Duration::from_millis(250);
54
55const SNAPSHOT_BYTES_PER_NODE: usize = 128;
57
58const SNAPSHOT_COMPACTION_THRESHOLD: usize = 2;
61
62#[derive(Debug, thiserror::Error)]
64pub enum SnapshotError {
65 #[error("failed to open snapshot: {0}")]
67 Open(std::io::Error),
68 #[error("failed to open new snapshot: {0}")]
70 OpenNew(std::io::Error),
71 #[error("failed to flush new snapshot: {0}")]
73 FlushNew(std::io::Error),
74 #[error("failed to flush snapshot: {0}")]
76 Flush(std::io::Error),
77 #[error("failed to fsync snapshot: {0}")]
79 Sync(std::io::Error),
80 #[error("failed to stat snapshot: {0}")]
82 Stat(std::io::Error),
83 #[error("failed to remove old snapshot: {0}")]
85 Remove(std::io::Error),
86 #[error("failed to install new snapshot: {0}")]
88 Install(std::io::Error),
89 #[error("failed to write to new snapshot: {0}")]
91 WriteNew(std::io::Error),
92 #[error("failed to write to snapshot: {0}")]
94 Write(std::io::Error),
95 #[error("failed to seek to beginning of snapshot: {0}")]
97 SeekStart(std::io::Error),
98 #[error("failed to seek to end of snapshot: {0}")]
100 SeekEnd(std::io::Error),
101 #[error("failed to replay snapshot: {0}")]
103 Replay(std::io::Error),
104 #[error(transparent)]
106 UnknownRecordType(#[from] UnknownRecordType),
107}
108
109#[derive(Debug, thiserror::Error)]
112#[error("unrecognized snapshot record type: {0}")]
113pub struct UnknownRecordType(u8);
114
115#[derive(Debug, Copy, Clone, PartialEq, Eq)]
116#[repr(u8)]
117enum SnapshotRecordType {
118 Alive = 0,
119 NotAlive = 1,
120 Clock = 2,
121 EventClock = 3,
122 QueryClock = 4,
123 Coordinate = 5,
124 Leave = 6,
125 Comment = 7,
126}
127
128impl TryFrom<u8> for SnapshotRecordType {
129 type Error = UnknownRecordType;
130
131 fn try_from(value: u8) -> Result<Self, Self::Error> {
132 match value {
133 0 => Ok(Self::Alive),
134 1 => Ok(Self::NotAlive),
135 2 => Ok(Self::Clock),
136 3 => Ok(Self::EventClock),
137 4 => Ok(Self::QueryClock),
138 5 => Ok(Self::Coordinate),
139 6 => Ok(Self::Leave),
140 7 => Ok(Self::Comment),
141 v => Err(UnknownRecordType(v)),
142 }
143 }
144}
145
146#[allow(dead_code)]
147enum SnapshotRecord<'a, I: Clone, A: Clone> {
148 Alive(Cow<'a, Node<I, A>>),
149 NotAlive(Cow<'a, Node<I, A>>),
150 Clock(LamportTime),
151 EventClock(LamportTime),
152 QueryClock(LamportTime),
153 Coordinate,
154 Leave,
155 Comment,
156}
157
158const MAX_INLINED_BYTES: usize = 64;
159
160macro_rules! encode {
161 ($w:ident.$node: ident::$status: ident) => {{
162 let node = $node.as_ref();
163 let encoded_node_len = T::node_encoded_len(node);
164 let encoded_len = 4 + 1 + encoded_node_len;
165 if encoded_len <= MAX_INLINED_BYTES {
166 let mut buf = [0u8; MAX_INLINED_BYTES];
167 buf[0] = Self::$status;
168 buf[1..5].copy_from_slice(&(encoded_node_len as u32).to_le_bytes());
169 T::encode_node(node, &mut buf[5..]).map_err(invalid_data_io_error)?;
170 $w.write_all(&buf[..encoded_len]).map(|_| encoded_len)
171 } else {
172 let mut buf = BytesMut::with_capacity(encoded_len);
173 buf.put_u8(Self::$status);
174 buf.put_u32_le(encoded_node_len as u32);
175 T::encode_node(node, &mut buf).map_err(invalid_data_io_error)?;
176 $w.write_all(&buf).map(|_| encoded_len)
177 }
178 }};
179 ($w:ident.$t: ident($status: ident)) => {{
180 const N: usize = mem::size_of::<u8>() + mem::size_of::<u64>();
181 let mut data = [0u8; N];
182 data[0] = Self::$status;
183 data[1..N].copy_from_slice(&$t.to_le_bytes());
184 $w.write_all(&data).map(|_| N)
185 }};
186 ($w:ident.$ident: ident) => {{
187 $w.write_all(&[Self::$ident]).map(|_| 1)
188 }};
189}
190
191impl<I, A> SnapshotRecord<'_, I, A>
192where
193 I: Id,
194 A: CheapClone + Send + Sync + 'static,
195{
196 const ALIVE: u8 = 0;
197 const NOT_ALIVE: u8 = 1;
198 const CLOCK: u8 = 2;
199 const EVENT_CLOCK: u8 = 3;
200 const QUERY_CLOCK: u8 = 4;
201 const COORDINATE: u8 = 5;
202 const LEAVE: u8 = 6;
203 const COMMENT: u8 = 7;
204
205 fn encode<T: TransformDelegate<Id = I, Address = A>, W: Write>(
206 &self,
207 w: &mut W,
208 ) -> std::io::Result<usize> {
209 match self {
210 Self::Alive(id) => encode!(w.id::ALIVE),
211 Self::NotAlive(id) => encode!(w.id::NOT_ALIVE),
212 Self::Clock(t) => encode!(w.t(CLOCK)),
213 Self::EventClock(t) => encode!(w.t(EVENT_CLOCK)),
214 Self::QueryClock(t) => encode!(w.t(QUERY_CLOCK)),
215 Self::Coordinate => encode!(w.COORDINATE),
216 Self::Leave => encode!(w.LEAVE),
217 Self::Comment => encode!(w.COMMENT),
218 }
219 }
220}
221
222#[viewit::viewit]
223pub(crate) struct ReplayResult<I, A> {
224 alive_nodes: HashSet<Node<I, A>>,
225 last_clock: LamportTime,
226 last_event_clock: LamportTime,
227 last_query_clock: LamportTime,
228 offset: u64,
229 fh: File,
230 path: PathBuf,
231}
232
233pub(crate) fn open_and_replay_snapshot<
234 I: Id,
235 A: CheapClone + core::hash::Hash + Eq + Send + Sync + 'static,
236 T: TransformDelegate<Id = I, Address = A>,
237 P: AsRef<std::path::Path>,
238>(
239 p: &P,
240 rejoin_after_leave: bool,
241) -> Result<ReplayResult<I, A>, SnapshotError> {
242 #[cfg(unix)]
244 let fh = OpenOptions::new()
245 .create(true)
246 .append(true)
247 .read(true)
248 .mode(0o644)
249 .open(p)
250 .map_err(SnapshotError::Open)?;
251 #[cfg(not(unix))]
252 let fh = OpenOptions::new()
253 .create(true)
254 .append(true)
255 .read(true)
256 .write(true)
257 .open(p)
258 .map_err(SnapshotError::Open)?;
259
260 let offset = fh.metadata().map_err(SnapshotError::Stat)?.len();
262
263 let mut reader = BufReader::new(fh);
265 let mut buf = Vec::new();
266 let mut alive_nodes = HashSet::new();
267 let mut last_clock = LamportTime::ZERO;
268 let mut last_event_clock = LamportTime::ZERO;
269 let mut last_query_clock = LamportTime::ZERO;
270
271 loop {
272 let kind = match reader.read_u8() {
273 Ok(b) => SnapshotRecordType::try_from(b)?,
274 Err(e) => {
275 if e.kind() == std::io::ErrorKind::UnexpectedEof {
276 break;
277 }
278 return Err(SnapshotError::Replay(e));
279 }
280 };
281
282 match kind {
283 SnapshotRecordType::Alive => {
284 let len = reader
285 .read_u32::<LittleEndian>()
286 .map_err(SnapshotError::Replay)? as usize;
287 buf.resize(len, 0);
288 reader.read_exact(&mut buf).map_err(SnapshotError::Replay)?;
289
290 let (_, node) =
291 T::decode_node(&buf).map_err(|e| SnapshotError::Replay(invalid_data_io_error(e)))?;
292 alive_nodes.insert(node);
293 }
294 SnapshotRecordType::NotAlive => {
295 let len = reader
296 .read_u32::<LittleEndian>()
297 .map_err(SnapshotError::Replay)? as usize;
298 buf.resize(len, 0);
299 reader.read_exact(&mut buf).map_err(SnapshotError::Replay)?;
300
301 let (_, node) =
302 T::decode_node(&buf).map_err(|e| SnapshotError::Replay(invalid_data_io_error(e)))?;
303 alive_nodes.remove(&node);
304 }
305 SnapshotRecordType::Clock => {
306 let t = reader
307 .read_u64::<LittleEndian>()
308 .map_err(SnapshotError::Replay)?;
309 last_clock = LamportTime::new(t);
310 }
311 SnapshotRecordType::EventClock => {
312 let t = reader
313 .read_u64::<LittleEndian>()
314 .map_err(SnapshotError::Replay)?;
315 last_event_clock = LamportTime::new(t);
316 }
317 SnapshotRecordType::QueryClock => {
318 let t = reader
319 .read_u64::<LittleEndian>()
320 .map_err(SnapshotError::Replay)?;
321 last_query_clock = LamportTime::new(t);
322 }
323 SnapshotRecordType::Coordinate => continue,
324 SnapshotRecordType::Leave => {
325 if rejoin_after_leave {
327 tracing::info!("serf: ignoring previous leave in snapshot");
328 continue;
329 }
330 alive_nodes.clear();
331 last_clock = LamportTime::ZERO;
332 last_event_clock = LamportTime::ZERO;
333 last_query_clock = LamportTime::ZERO;
334 }
335 SnapshotRecordType::Comment => continue,
336 }
337 }
338
339 let mut f = reader.into_inner();
341
342 f.seek(std::io::SeekFrom::End(0))
343 .map(|_| ReplayResult {
344 alive_nodes,
345 last_clock,
346 last_event_clock,
347 last_query_clock,
348 offset,
349 fh: f,
350 path: p.as_ref().to_path_buf(),
351 })
352 .map_err(SnapshotError::SeekEnd)
353}
354
355pub(crate) struct SnapshotHandle {
356 wait_rx: Receiver<()>,
357 shutdown_rx: Receiver<()>,
358 leave_tx: Sender<()>,
359}
360
361impl SnapshotHandle {
362 pub(crate) async fn wait(&self) {
364 let _ = self.wait_rx.recv().await;
365 }
366
367 pub(crate) async fn leave(&self) {
370 futures::select! {
371 _ = self.leave_tx.send(()).fuse() => {},
372 _ = self.shutdown_rx.recv().fuse() => {},
373 }
374 }
375}
376
377pub(crate) struct Snapshot<T, D>
380where
381 D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
382 T: Transport,
383{
384 alive_nodes: HashSet<Node<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>>,
385 clock: LamportClock,
386 fh: Option<BufWriter<File>>,
387 last_flush: Epoch,
388 last_clock: LamportTime,
389 last_event_clock: LamportTime,
390 last_query_clock: LamportTime,
391 leave_rx: Receiver<()>,
392 leaving: bool,
393 min_compact_size: u64,
394 path: PathBuf,
395 offset: u64,
396 rejoin_after_leave: bool,
397 stream_rx: Receiver<CrateEvent<T, D>>,
398 shutdown_rx: Receiver<()>,
399 wait_tx: Sender<()>,
400 last_attempted_compaction: Epoch,
401 #[cfg(feature = "metrics")]
402 metric_labels: std::sync::Arc<memberlist_core::types::MetricLabels>,
403}
404
405macro_rules! stream_flush_event {
407 ($this:ident <- $event:ident) => {{
408 if $this.leaving {
410 break;
411 }
412
413 match &$event {
414 CrateEvent::Member(e) => $this.process_member_event(e),
415 CrateEvent::User(e) => $this.process_user_event(e),
416 CrateEvent::Query(e) => $this.process_query_event(e.ltime),
417 CrateEvent::InternalQuery { query, .. } => $this.process_query_event(query.ltime),
418 }
419 }};
420}
421
422macro_rules! tee_stream_flush_event {
423 ($stream_tx:ident <- $event:ident -> $out_tx:ident) => {{
424 futures::select! {
426 _ = $stream_tx.send($event.clone()).fuse() => {}
427 default => {}
428 }
429
430 futures::select! {
432 _ = $out_tx.send($event).fuse() => {}
433 default => {}
434 }
435 }};
436}
437
438impl<D, T> Snapshot<T, D>
439where
440 D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
441 T: Transport,
442{
443 #[allow(clippy::type_complexity)]
444 pub(crate) fn from_replay_result(
445 replay_result: ReplayResult<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>,
446 min_compact_size: u64,
447 rejoin_after_leave: bool,
448 clock: LamportClock,
449 out_tx: Sender<CrateEvent<T, D>>,
450 shutdown_rx: Receiver<()>,
451 #[cfg(feature = "metrics")] metric_labels: std::sync::Arc<memberlist_core::types::MetricLabels>,
452 ) -> Result<
453 (
454 Sender<CrateEvent<T, D>>,
455 TinyVec<Node<T::Id, MaybeResolvedAddress<T>>>,
456 SnapshotHandle,
457 ),
458 SnapshotError,
459 > {
460 let (in_tx, in_rx) = async_channel::bounded(EVENT_CH_SIZE);
461 let (stream_tx, stream_rx) = async_channel::bounded(EVENT_CH_SIZE);
462 let (leave_tx, leave_rx) = async_channel::bounded(1);
463 let (wait_tx, wait_rx) = async_channel::bounded(1);
464
465 let ReplayResult {
466 alive_nodes,
467 last_clock,
468 last_event_clock,
469 last_query_clock,
470 offset,
471 fh,
472 path,
473 } = replay_result;
474
475 let this = Self {
477 alive_nodes,
478 clock,
479 fh: Some(BufWriter::new(fh)),
480 last_flush: Epoch::now(),
481 last_clock,
482 last_event_clock,
483 last_query_clock,
484 leave_rx,
485 leaving: false,
486 min_compact_size,
487 path,
488 offset,
489 rejoin_after_leave,
490 stream_rx,
491 shutdown_rx: shutdown_rx.clone(),
492 wait_tx,
493 last_attempted_compaction: Epoch::now(),
494 #[cfg(feature = "metrics")]
495 metric_labels,
496 };
497
498 let mut alive_nodes = this
499 .alive_nodes
500 .iter()
501 .map(|n| {
502 let id = n.id().cheap_clone();
503 let addr = n.address().cheap_clone();
504 Node::new(id, MaybeResolvedAddress::resolved(addr))
505 })
506 .collect::<TinyVec<_>>();
507 alive_nodes.shuffle(&mut rand::thread_rng());
508
509 let handle = <T::Runtime as RuntimeLite>::spawn(Self::tee_stream(
511 in_rx,
512 stream_tx,
513 out_tx,
514 shutdown_rx.clone(),
515 ));
516 <T::Runtime as RuntimeLite>::spawn_detach(this.stream(handle));
517
518 Ok((
519 in_tx,
520 alive_nodes,
521 SnapshotHandle {
522 wait_rx,
523 shutdown_rx,
524 leave_tx,
525 },
526 ))
527 }
528
529 async fn tee_stream(
532 in_rx: Receiver<CrateEvent<T, D>>,
533 stream_tx: Sender<CrateEvent<T, D>>,
534 out_tx: Sender<CrateEvent<T, D>>,
535 shutdown_rx: Receiver<()>,
536 ) {
537 loop {
538 futures::select! {
539 ev = in_rx.recv().fuse() => {
540 if let Ok(ev) = ev {
541 tee_stream_flush_event!(stream_tx <- ev -> out_tx)
542 } else {
543 break;
544 }
545 }
546 _ = shutdown_rx.recv().fuse() => {
547 break;
548 }
549 }
550 }
551
552 loop {
554 futures::select! {
555 ev = in_rx.recv().fuse() => {
556 if let Ok(ev) = ev {
557 tee_stream_flush_event!(stream_tx <- ev -> out_tx)
558 } else {
559 break;
560 }
561 }
562 default => break,
563 }
564 }
565 tracing::debug!("serf: snapshotter tee stream exits");
566 }
567
568 fn handle_leave(&mut self) {
569 self.leaving = true;
570
571 if !self.rejoin_after_leave {
573 self.alive_nodes.clear();
574 }
575 self.try_append(SnapshotRecord::Leave);
576 if let Some(fh) = self.fh.as_mut() {
577 if let Err(e) = fh.flush() {
578 tracing::error!(target="serf", err=%SnapshotError::Flush(e), "failed to flush leave to snapshot");
579 }
580
581 if let Err(e) = fh.get_mut().sync_all() {
582 tracing::error!(target="serf", err=%SnapshotError::Sync(e), "failed to sync leave to snapshot");
583 }
584 }
585 }
586
587 async fn stream(
589 mut self,
590 tee_handle: <<T::Runtime as RuntimeLite>::Spawner as AsyncSpawner>::JoinHandle<()>,
591 ) {
592 let mut clock_ticker = <T::Runtime as RuntimeLite>::interval(CLOCK_UPDATE_INTERVAL);
593
594 loop {
595 futures::select! {
596 signal = self.leave_rx.recv().fuse() => {
597 if signal.is_ok() {
598 self.handle_leave();
599 }
600 }
601 ev = self.stream_rx.recv().fuse() => {
602 if let Ok(ev) = ev {
603 stream_flush_event!(self <- ev)
604 } else {
605 break;
606 }
607 }
608 _ = futures::StreamExt::next(&mut clock_ticker).fuse() => {
609 self.update_clock();
610 }
611 _ = self.shutdown_rx.recv().fuse() => {
612 break;
613 }
614 }
615 }
616
617 if self.leave_rx.try_recv().is_ok() {
618 self.handle_leave();
619 }
620
621 let flush_timeout = <T::Runtime as RuntimeLite>::sleep(SHUTDOWN_FLUSH_TIMEOUT);
623 futures::pin_mut!(flush_timeout);
624
625 self.update_clock();
627
628 loop {
630 futures::select! {
631 ev = self.stream_rx.recv().fuse() => {
632 if let Ok(ev) = ev {
633 stream_flush_event!(self <- ev)
634 } else {
635 break;
636 }
637 }
638 _ = (&mut flush_timeout).fuse() => {
639 break;
640 }
641 default => {
642 break;
643 }
644 }
645 }
646
647 if let Some(fh) = self.fh.as_mut() {
648 if let Err(e) = fh.flush() {
649 tracing::error!(target="serf", err=%SnapshotError::Flush(e), "failed to flush leave to snapshot");
650 }
651
652 if let Err(e) = fh.get_mut().sync_all() {
653 tracing::error!(target="serf", err=%SnapshotError::Sync(e), "failed to sync leave to snapshot");
654 }
655 }
656
657 self.wait_tx.close();
658 tee_handle.await;
659 tracing::debug!("serf: snapshotter stream exits");
660 }
661
662 fn process_user_event(&mut self, e: &UserEventMessage) {
664 let ltime = e.ltime();
666 if ltime <= self.last_event_clock {
667 return;
668 }
669
670 self.last_event_clock = ltime;
671 self.try_append(SnapshotRecord::EventClock(ltime));
672 }
673
674 fn process_query_event(&mut self, ltime: LamportTime) {
676 if ltime <= self.last_query_clock {
678 return;
679 }
680
681 self.last_query_clock = ltime;
682 self.try_append(SnapshotRecord::QueryClock(ltime));
683 }
684
685 fn process_member_event(
687 &mut self,
688 e: &MemberEvent<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>,
689 ) {
690 match e.ty {
691 MemberEventType::Join => {
692 for m in e.members() {
693 let node = m.node();
694 self.alive_nodes.insert(node.cheap_clone());
695 self.try_append(SnapshotRecord::Alive(Cow::Borrowed(node)))
696 }
697 }
698 MemberEventType::Leave | MemberEventType::Failed => {
699 for m in e.members() {
700 let node = m.node();
701 self.alive_nodes.remove(node);
702 self.try_append(SnapshotRecord::NotAlive(Cow::Borrowed(node)));
703 }
704 }
705 _ => {}
706 }
707 self.update_clock();
708 }
709
710 fn update_clock(&mut self) {
714 let t: u64 = self.clock.time().into();
715 let last_seen = LamportTime::from(t.saturating_sub(1));
716 if last_seen > self.last_clock {
717 self.last_clock = last_seen;
718 self.try_append(SnapshotRecord::Clock(self.last_clock));
719 }
720 }
721
722 fn try_append(
723 &mut self,
724 l: SnapshotRecord<'_, T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>,
725 ) {
726 if let Err(e) = self.append_line(l) {
727 tracing::error!(err = %e, "serf: failed to update snapshot");
728 if self.last_attempted_compaction.elapsed() > SNAPSHOT_ERROR_RECOVERY_INTERVAL {
729 self.last_attempted_compaction = Epoch::now();
730 tracing::info!("serf: attempting compaction to recover from error...");
731 if let Err(e) = self.compact() {
732 tracing::error!(err = %e, "serf: compaction failed, will reattempt after {}s", SNAPSHOT_ERROR_RECOVERY_INTERVAL.as_secs());
733 } else {
734 tracing::info!("serf: finished compaction, successfully recovered from error state");
735 }
736 }
737 }
738 }
739
740 fn append_line(
741 &mut self,
742 l: SnapshotRecord<'_, T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>,
743 ) -> Result<(), SnapshotError> {
744 #[cfg(feature = "metrics")]
745 let start = crate::types::Epoch::now();
746
747 #[cfg(feature = "metrics")]
748 let metric_labels = self.metric_labels.clone();
749 #[cfg(feature = "metrics")]
750 scopeguard::defer!(
751 metrics::histogram!("serf.snapshot.append_line", metric_labels.iter())
752 .record(start.elapsed().as_millis() as f64)
753 );
754
755 let f = self.fh.as_mut().unwrap();
756 let n = l.encode::<D, _>(f).map_err(SnapshotError::Write)?;
757
758 if self.last_flush.elapsed() > FLUSH_INTERVAL {
760 self.last_flush = Epoch::now();
761 self
762 .fh
763 .as_mut()
764 .unwrap()
765 .flush()
766 .map_err(SnapshotError::Flush)?;
767 }
768
769 self.offset += n as u64;
771 if self.offset > self.snapshot_max_size() {
772 self.compact()?;
773 }
774 Ok(())
775 }
776
777 fn snapshot_max_size(&self) -> u64 {
779 let nodes = self.alive_nodes.len() as u64;
780 let est_size = nodes * SNAPSHOT_BYTES_PER_NODE as u64;
781 let threshold = est_size * SNAPSHOT_COMPACTION_THRESHOLD as u64;
782 threshold.max(self.min_compact_size)
783 }
784
785 fn compact(&mut self) -> Result<(), SnapshotError> {
787 #[cfg(feature = "metrics")]
788 let start = crate::types::Epoch::now();
789
790 #[cfg(feature = "metrics")]
791 let metric_labels = self.metric_labels.clone();
792 #[cfg(feature = "metrics")]
793 scopeguard::defer!(
794 metrics::histogram!("serf.snapshot.compact", metric_labels.iter())
795 .record(start.elapsed().as_millis() as f64)
796 );
797
798 let new_path = self.path.with_extension(TMP_EXT);
800 #[cfg(unix)]
801 let fh = OpenOptions::new()
802 .create(true)
803 .write(true)
804 .truncate(true)
805 .mode(0o755)
806 .open(&new_path)
807 .map_err(SnapshotError::OpenNew)?;
808
809 #[cfg(not(unix))]
810 let fh = OpenOptions::new()
811 .create(true)
812 .write(true)
813 .truncate(true)
814 .open(&new_path)
815 .map_err(SnapshotError::OpenNew)?;
816
817 let mut buf = BufWriter::new(fh);
819
820 let mut offset = 0u64;
822 for node in self.alive_nodes.iter() {
823 offset += SnapshotRecord::Alive(Cow::Borrowed(node))
824 .encode::<D, _>(&mut buf)
825 .map_err(SnapshotError::WriteNew)? as u64;
826 }
827
828 offset += SnapshotRecord::Clock(self.last_clock)
830 .encode::<D, _>(&mut buf)
831 .map_err(SnapshotError::WriteNew)? as u64;
832
833 offset += SnapshotRecord::EventClock(self.last_event_clock)
834 .encode::<D, _>(&mut buf)
835 .map_err(SnapshotError::WriteNew)? as u64;
836
837 offset += SnapshotRecord::QueryClock(self.last_query_clock)
838 .encode::<D, _>(&mut buf)
839 .map_err(SnapshotError::WriteNew)? as u64;
840
841 buf.flush().map_err(SnapshotError::Flush)?;
843
844 buf.get_ref().sync_all().map_err(SnapshotError::Sync)?;
846 drop(buf);
847
848 let mut old = self.fh.take().unwrap();
858 let _ = old.flush();
859 drop(old);
860
861 if let Err(e) = std::fs::remove_file(&self.path) {
863 if !matches!(e.kind(), std::io::ErrorKind::NotFound) {
864 return Err(SnapshotError::Remove(e));
865 }
866 }
867
868 std::fs::rename(&new_path, &self.path).map_err(SnapshotError::Install)?;
870
871 #[cfg(unix)]
873 let fh = OpenOptions::new()
874 .create(true)
875 .append(true)
876 .read(true)
877 .mode(0o755)
878 .open(&self.path)
879 .map_err(SnapshotError::Open)?;
880
881 #[cfg(not(unix))]
882 let fh = OpenOptions::new()
883 .create(true)
884 .append(true)
885 .read(true)
886 .write(true)
887 .open(&self.path)
888 .map_err(SnapshotError::Open)?;
889
890 self.fh = Some(BufWriter::new(fh));
891 self.offset = offset;
892 self.last_flush = Epoch::now();
893 Ok(())
894 }
895}