1use std::{
2 borrow::Cow,
3 collections::HashSet,
4 fs::{File, OpenOptions},
5 io::{BufReader, BufWriter, Read, Seek, Write},
6 mem,
7 path::PathBuf,
8 time::Duration,
9};
10
11#[cfg(unix)]
12use std::os::unix::prelude::OpenOptionsExt;
13
14use crate::types::UserEventMessage;
15use async_channel::{Receiver, Sender};
16use byteorder::{LittleEndian, ReadBytesExt};
17use futures::FutureExt;
18use memberlist_core::{
19 CheapClone,
20 agnostic_lite::{AsyncSpawner, RuntimeLite},
21 bytes::{BufMut, BytesMut},
22 proto::{Data, MaybeResolvedAddress, TinyVec},
23 tracing,
24 transport::{Id, Node, Transport},
25};
26use rand::seq::SliceRandom;
27
28use crate::{
29 delegate::Delegate,
30 event::{CrateEvent, MemberEvent, MemberEventType},
31 invalid_data_io_error,
32 types::{Epoch, LamportClock, LamportTime},
33};
34
35const FLUSH_INTERVAL: Duration = Duration::from_millis(500);
37
38const CLOCK_UPDATE_INTERVAL: Duration = Duration::from_millis(500);
40
41const TMP_EXT: &str = "compact";
43
44const SNAPSHOT_ERROR_RECOVERY_INTERVAL: Duration = Duration::from_secs(30);
47
48const EVENT_CH_SIZE: usize = 2048;
51
52const SHUTDOWN_FLUSH_TIMEOUT: Duration = Duration::from_millis(250);
54
55const SNAPSHOT_BYTES_PER_NODE: usize = 128;
57
58const SNAPSHOT_COMPACTION_THRESHOLD: usize = 2;
61
62#[derive(Debug, thiserror::Error)]
64pub enum SnapshotError {
65 #[error("failed to open snapshot: {0}")]
67 Open(std::io::Error),
68 #[error("failed to open new snapshot: {0}")]
70 OpenNew(std::io::Error),
71 #[error("failed to flush new snapshot: {0}")]
73 FlushNew(std::io::Error),
74 #[error("failed to flush snapshot: {0}")]
76 Flush(std::io::Error),
77 #[error("failed to fsync snapshot: {0}")]
79 Sync(std::io::Error),
80 #[error("failed to stat snapshot: {0}")]
82 Stat(std::io::Error),
83 #[error("failed to remove old snapshot: {0}")]
85 Remove(std::io::Error),
86 #[error("failed to install new snapshot: {0}")]
88 Install(std::io::Error),
89 #[error("failed to write to new snapshot: {0}")]
91 WriteNew(std::io::Error),
92 #[error("failed to write to snapshot: {0}")]
94 Write(std::io::Error),
95 #[error("failed to seek to beginning of snapshot: {0}")]
97 SeekStart(std::io::Error),
98 #[error("failed to seek to end of snapshot: {0}")]
100 SeekEnd(std::io::Error),
101 #[error("failed to replay snapshot: {0}")]
103 Replay(std::io::Error),
104 #[error(transparent)]
106 UnknownRecordType(#[from] UnknownRecordType),
107}
108
109#[derive(Debug, thiserror::Error)]
112#[error("unrecognized snapshot record type: {0}")]
113pub struct UnknownRecordType(u8);
114
115#[derive(Debug, Copy, Clone, PartialEq, Eq)]
116#[repr(u8)]
117enum SnapshotRecordType {
118 Alive = 0,
119 NotAlive = 1,
120 Clock = 2,
121 EventClock = 3,
122 QueryClock = 4,
123 Coordinate = 5,
124 Leave = 6,
125 Comment = 7,
126}
127
128impl TryFrom<u8> for SnapshotRecordType {
129 type Error = UnknownRecordType;
130
131 fn try_from(value: u8) -> Result<Self, Self::Error> {
132 match value {
133 0 => Ok(Self::Alive),
134 1 => Ok(Self::NotAlive),
135 2 => Ok(Self::Clock),
136 3 => Ok(Self::EventClock),
137 4 => Ok(Self::QueryClock),
138 5 => Ok(Self::Coordinate),
139 6 => Ok(Self::Leave),
140 7 => Ok(Self::Comment),
141 v => Err(UnknownRecordType(v)),
142 }
143 }
144}
145
146#[allow(dead_code)]
147enum SnapshotRecord<'a, I: Clone, A: Clone> {
148 Alive(Cow<'a, Node<I, A>>),
149 NotAlive(Cow<'a, Node<I, A>>),
150 Clock(LamportTime),
151 EventClock(LamportTime),
152 QueryClock(LamportTime),
153 Coordinate,
154 Leave,
155 Comment,
156}
157
158const MAX_INLINED_BYTES: usize = 64;
159
160macro_rules! encode {
161 ($w:ident.$node: ident::$status: ident) => {{
162 let node = $node.as_ref();
163 let encoded_node_len = node.encoded_len();
164 let encoded_len = 4 + 1 + encoded_node_len;
165 if encoded_len <= MAX_INLINED_BYTES {
166 let mut buf = [0u8; MAX_INLINED_BYTES];
167 buf[0] = Self::$status;
168 buf[1..5].copy_from_slice(&(encoded_node_len as u32).to_le_bytes());
169 node.encode(&mut buf[5..]).map_err(invalid_data_io_error)?;
170 $w.write_all(&buf[..encoded_len]).map(|_| encoded_len)
171 } else {
172 let mut buf = BytesMut::with_capacity(encoded_len);
173 buf.put_u8(Self::$status);
174 buf.put_u32_le(encoded_node_len as u32);
175 node.encode(&mut buf).map_err(invalid_data_io_error)?;
176 $w.write_all(&buf).map(|_| encoded_len)
177 }
178 }};
179 ($w:ident.$t: ident($status: ident)) => {{
180 const N: usize = mem::size_of::<u8>() + mem::size_of::<u64>();
181 let mut data = [0u8; N];
182 data[0] = Self::$status;
183 data[1..N].copy_from_slice(&$t.to_le_bytes());
184 $w.write_all(&data).map(|_| N)
185 }};
186 ($w:ident.$ident: ident) => {{ $w.write_all(&[Self::$ident]).map(|_| 1) }};
187}
188
189impl<I, A> SnapshotRecord<'_, I, A>
190where
191 I: Id + Data,
192 A: CheapClone + Data + Send + Sync + 'static,
193{
194 const ALIVE: u8 = 0;
195 const NOT_ALIVE: u8 = 1;
196 const CLOCK: u8 = 2;
197 const EVENT_CLOCK: u8 = 3;
198 const QUERY_CLOCK: u8 = 4;
199 const COORDINATE: u8 = 5;
200 const LEAVE: u8 = 6;
201 const COMMENT: u8 = 7;
202
203 fn encode<W: Write>(&self, w: &mut W) -> std::io::Result<usize> {
204 match self {
205 Self::Alive(id) => encode!(w.id::ALIVE),
206 Self::NotAlive(id) => encode!(w.id::NOT_ALIVE),
207 Self::Clock(t) => encode!(w.t(CLOCK)),
208 Self::EventClock(t) => encode!(w.t(EVENT_CLOCK)),
209 Self::QueryClock(t) => encode!(w.t(QUERY_CLOCK)),
210 Self::Coordinate => encode!(w.COORDINATE),
211 Self::Leave => encode!(w.LEAVE),
212 Self::Comment => encode!(w.COMMENT),
213 }
214 }
215}
216
217#[viewit::viewit]
218pub(crate) struct ReplayResult<I, A> {
219 alive_nodes: HashSet<Node<I, A>>,
220 last_clock: LamportTime,
221 last_event_clock: LamportTime,
222 last_query_clock: LamportTime,
223 offset: u64,
224 fh: File,
225 path: PathBuf,
226}
227
228pub(crate) fn open_and_replay_snapshot<
229 I: Id + Data,
230 A: CheapClone + Data + core::hash::Hash + Eq + Send + Sync + 'static,
231 P: AsRef<std::path::Path>,
232>(
233 p: &P,
234 rejoin_after_leave: bool,
235) -> Result<ReplayResult<I, A>, SnapshotError> {
236 #[cfg(unix)]
238 let fh = OpenOptions::new()
239 .create(true)
240 .append(true)
241 .read(true)
242 .mode(0o644)
243 .open(p)
244 .map_err(SnapshotError::Open)?;
245 #[cfg(not(unix))]
246 let fh = OpenOptions::new()
247 .create(true)
248 .append(true)
249 .read(true)
250 .write(true)
251 .open(p)
252 .map_err(SnapshotError::Open)?;
253
254 let offset = fh.metadata().map_err(SnapshotError::Stat)?.len();
256
257 let mut reader = BufReader::new(fh);
259 let mut buf = Vec::new();
260 let mut alive_nodes = HashSet::new();
261 let mut last_clock = LamportTime::ZERO;
262 let mut last_event_clock = LamportTime::ZERO;
263 let mut last_query_clock = LamportTime::ZERO;
264
265 loop {
266 let kind = match reader.read_u8() {
267 Ok(b) => SnapshotRecordType::try_from(b)?,
268 Err(e) => {
269 if e.kind() == std::io::ErrorKind::UnexpectedEof {
270 break;
271 }
272 return Err(SnapshotError::Replay(e));
273 }
274 };
275
276 match kind {
277 SnapshotRecordType::Alive => {
278 let len = reader
279 .read_u32::<LittleEndian>()
280 .map_err(SnapshotError::Replay)? as usize;
281 buf.resize(len, 0);
282 reader.read_exact(&mut buf).map_err(SnapshotError::Replay)?;
283
284 let (_, node) = <Node<I, A> as Data>::decode(&buf)
285 .map_err(|e| SnapshotError::Replay(invalid_data_io_error(e)))?;
286 alive_nodes.insert(node);
287 }
288 SnapshotRecordType::NotAlive => {
289 let len = reader
290 .read_u32::<LittleEndian>()
291 .map_err(SnapshotError::Replay)? as usize;
292 buf.resize(len, 0);
293 reader.read_exact(&mut buf).map_err(SnapshotError::Replay)?;
294
295 let (_, node) = <Node<I, A> as Data>::decode(&buf)
296 .map_err(|e| SnapshotError::Replay(invalid_data_io_error(e)))?;
297 alive_nodes.remove(&node);
298 }
299 SnapshotRecordType::Clock => {
300 let t = reader
301 .read_u64::<LittleEndian>()
302 .map_err(SnapshotError::Replay)?;
303 last_clock = LamportTime::new(t);
304 }
305 SnapshotRecordType::EventClock => {
306 let t = reader
307 .read_u64::<LittleEndian>()
308 .map_err(SnapshotError::Replay)?;
309 last_event_clock = LamportTime::new(t);
310 }
311 SnapshotRecordType::QueryClock => {
312 let t = reader
313 .read_u64::<LittleEndian>()
314 .map_err(SnapshotError::Replay)?;
315 last_query_clock = LamportTime::new(t);
316 }
317 SnapshotRecordType::Coordinate => continue,
318 SnapshotRecordType::Leave => {
319 if rejoin_after_leave {
321 tracing::info!("serf: ignoring previous leave in snapshot");
322 continue;
323 }
324 alive_nodes.clear();
325 last_clock = LamportTime::ZERO;
326 last_event_clock = LamportTime::ZERO;
327 last_query_clock = LamportTime::ZERO;
328 }
329 SnapshotRecordType::Comment => continue,
330 }
331 }
332
333 let mut f = reader.into_inner();
335
336 f.seek(std::io::SeekFrom::End(0))
337 .map(|_| ReplayResult {
338 alive_nodes,
339 last_clock,
340 last_event_clock,
341 last_query_clock,
342 offset,
343 fh: f,
344 path: p.as_ref().to_path_buf(),
345 })
346 .map_err(SnapshotError::SeekEnd)
347}
348
349pub(crate) struct SnapshotHandle {
350 wait_rx: Receiver<()>,
351 shutdown_rx: Receiver<()>,
352 leave_tx: Sender<()>,
353}
354
355impl SnapshotHandle {
356 pub(crate) async fn wait(&self) {
358 let _ = self.wait_rx.recv().await;
359 }
360
361 pub(crate) async fn leave(&self) {
364 futures::select! {
365 _ = self.leave_tx.send(()).fuse() => {},
366 _ = self.shutdown_rx.recv().fuse() => {},
367 }
368 }
369}
370
371pub(crate) struct Snapshot<T, D>
374where
375 D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
376 T: Transport,
377{
378 alive_nodes: HashSet<Node<T::Id, T::ResolvedAddress>>,
379 clock: LamportClock,
380 fh: Option<BufWriter<File>>,
381 last_flush: Epoch,
382 last_clock: LamportTime,
383 last_event_clock: LamportTime,
384 last_query_clock: LamportTime,
385 leave_rx: Receiver<()>,
386 leaving: bool,
387 min_compact_size: u64,
388 path: PathBuf,
389 offset: u64,
390 rejoin_after_leave: bool,
391 stream_rx: Receiver<CrateEvent<T, D>>,
392 shutdown_rx: Receiver<()>,
393 wait_tx: Sender<()>,
394 last_attempted_compaction: Epoch,
395 #[cfg(feature = "metrics")]
396 metric_labels: std::sync::Arc<memberlist_core::proto::MetricLabels>,
397}
398
399macro_rules! stream_flush_event {
401 ($this:ident <- $event:ident) => {{
402 if $this.leaving {
404 break;
405 }
406
407 match &$event {
408 CrateEvent::Member(e) => $this.process_member_event(e),
409 CrateEvent::User(e) => $this.process_user_event(e),
410 CrateEvent::Query(e) => $this.process_query_event(e.ltime),
411 CrateEvent::InternalQuery { query, .. } => $this.process_query_event(query.ltime),
412 }
413 }};
414}
415
416macro_rules! tee_stream_flush_event {
417 ($stream_tx:ident <- $event:ident -> $out_tx:ident) => {{
418 futures::select! {
420 _ = $stream_tx.send($event.clone()).fuse() => {}
421 default => {}
422 }
423
424 futures::select! {
426 _ = $out_tx.send($event).fuse() => {}
427 default => {}
428 }
429 }};
430}
431
432impl<D, T> Snapshot<T, D>
433where
434 D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
435 T: Transport,
436{
437 #[allow(clippy::type_complexity)]
438 pub(crate) fn from_replay_result(
439 replay_result: ReplayResult<T::Id, T::ResolvedAddress>,
440 min_compact_size: u64,
441 rejoin_after_leave: bool,
442 clock: LamportClock,
443 out_tx: Sender<CrateEvent<T, D>>,
444 shutdown_rx: Receiver<()>,
445 #[cfg(feature = "metrics")] metric_labels: std::sync::Arc<memberlist_core::proto::MetricLabels>,
446 ) -> Result<
447 (
448 Sender<CrateEvent<T, D>>,
449 TinyVec<Node<T::Id, MaybeResolvedAddress<T::Address, T::ResolvedAddress>>>,
450 SnapshotHandle,
451 ),
452 SnapshotError,
453 > {
454 let (in_tx, in_rx) = async_channel::bounded(EVENT_CH_SIZE);
455 let (stream_tx, stream_rx) = async_channel::bounded(EVENT_CH_SIZE);
456 let (leave_tx, leave_rx) = async_channel::bounded(1);
457 let (wait_tx, wait_rx) = async_channel::bounded(1);
458
459 let ReplayResult {
460 alive_nodes,
461 last_clock,
462 last_event_clock,
463 last_query_clock,
464 offset,
465 fh,
466 path,
467 } = replay_result;
468
469 let this = Self {
471 alive_nodes,
472 clock,
473 fh: Some(BufWriter::new(fh)),
474 last_flush: Epoch::now(),
475 last_clock,
476 last_event_clock,
477 last_query_clock,
478 leave_rx,
479 leaving: false,
480 min_compact_size,
481 path,
482 offset,
483 rejoin_after_leave,
484 stream_rx,
485 shutdown_rx: shutdown_rx.clone(),
486 wait_tx,
487 last_attempted_compaction: Epoch::now(),
488 #[cfg(feature = "metrics")]
489 metric_labels,
490 };
491
492 let mut alive_nodes = this
493 .alive_nodes
494 .iter()
495 .map(|n| {
496 let id = n.id().cheap_clone();
497 let addr = n.address().cheap_clone();
498 Node::new(id, MaybeResolvedAddress::resolved(addr))
499 })
500 .collect::<TinyVec<_>>();
501 alive_nodes.shuffle(&mut rand::rng());
502
503 let handle = <T::Runtime as RuntimeLite>::spawn(Self::tee_stream(
505 in_rx,
506 stream_tx,
507 out_tx,
508 shutdown_rx.clone(),
509 ));
510 <T::Runtime as RuntimeLite>::spawn_detach(this.stream(handle));
511
512 Ok((
513 in_tx,
514 alive_nodes,
515 SnapshotHandle {
516 wait_rx,
517 shutdown_rx,
518 leave_tx,
519 },
520 ))
521 }
522
523 async fn tee_stream(
526 in_rx: Receiver<CrateEvent<T, D>>,
527 stream_tx: Sender<CrateEvent<T, D>>,
528 out_tx: Sender<CrateEvent<T, D>>,
529 shutdown_rx: Receiver<()>,
530 ) {
531 loop {
532 futures::select! {
533 ev = in_rx.recv().fuse() => {
534 if let Ok(ev) = ev {
535 tee_stream_flush_event!(stream_tx <- ev -> out_tx)
536 } else {
537 break;
538 }
539 }
540 _ = shutdown_rx.recv().fuse() => {
541 break;
542 }
543 }
544 }
545
546 loop {
548 futures::select! {
549 ev = in_rx.recv().fuse() => {
550 if let Ok(ev) = ev {
551 tee_stream_flush_event!(stream_tx <- ev -> out_tx)
552 } else {
553 break;
554 }
555 }
556 default => break,
557 }
558 }
559 tracing::debug!("serf: snapshotter tee stream exits");
560 }
561
562 fn handle_leave(&mut self) {
563 self.leaving = true;
564
565 if !self.rejoin_after_leave {
567 self.alive_nodes.clear();
568 }
569 self.try_append(SnapshotRecord::Leave);
570 if let Some(fh) = self.fh.as_mut() {
571 if let Err(e) = fh.flush() {
572 tracing::error!(target="serf", err=%SnapshotError::Flush(e), "failed to flush leave to snapshot");
573 }
574
575 if let Err(e) = fh.get_mut().sync_all() {
576 tracing::error!(target="serf", err=%SnapshotError::Sync(e), "failed to sync leave to snapshot");
577 }
578 }
579 }
580
581 async fn stream(
583 mut self,
584 tee_handle: <<T::Runtime as RuntimeLite>::Spawner as AsyncSpawner>::JoinHandle<()>,
585 ) {
586 let mut clock_ticker = <T::Runtime as RuntimeLite>::interval(CLOCK_UPDATE_INTERVAL);
587
588 loop {
589 futures::select! {
590 signal = self.leave_rx.recv().fuse() => {
591 if signal.is_ok() {
592 self.handle_leave();
593 }
594 }
595 ev = self.stream_rx.recv().fuse() => {
596 if let Ok(ev) = ev {
597 stream_flush_event!(self <- ev)
598 } else {
599 break;
600 }
601 }
602 _ = futures::StreamExt::next(&mut clock_ticker).fuse() => {
603 self.update_clock();
604 }
605 _ = self.shutdown_rx.recv().fuse() => {
606 break;
607 }
608 }
609 }
610
611 if self.leave_rx.try_recv().is_ok() {
612 self.handle_leave();
613 }
614
615 let flush_timeout = <T::Runtime as RuntimeLite>::sleep(SHUTDOWN_FLUSH_TIMEOUT);
617 futures::pin_mut!(flush_timeout);
618
619 self.update_clock();
621
622 loop {
624 futures::select! {
625 ev = self.stream_rx.recv().fuse() => {
626 if let Ok(ev) = ev {
627 stream_flush_event!(self <- ev)
628 } else {
629 break;
630 }
631 }
632 _ = (&mut flush_timeout).fuse() => {
633 break;
634 }
635 default => {
636 break;
637 }
638 }
639 }
640
641 if let Some(fh) = self.fh.as_mut() {
642 if let Err(e) = fh.flush() {
643 tracing::error!(target="serf", err=%SnapshotError::Flush(e), "failed to flush leave to snapshot");
644 }
645
646 if let Err(e) = fh.get_mut().sync_all() {
647 tracing::error!(target="serf", err=%SnapshotError::Sync(e), "failed to sync leave to snapshot");
648 }
649 }
650
651 self.wait_tx.close();
652 if let Err(e) = tee_handle.await {
653 tracing::error!(target="serf", err=%e, "failed to wait for tee stream to exit");
654 }
655 tracing::debug!("serf: snapshotter stream exits");
656 }
657
658 fn process_user_event(&mut self, e: &UserEventMessage) {
660 let ltime = e.ltime();
662 if ltime <= self.last_event_clock {
663 return;
664 }
665
666 self.last_event_clock = ltime;
667 self.try_append(SnapshotRecord::EventClock(ltime));
668 }
669
670 fn process_query_event(&mut self, ltime: LamportTime) {
672 if ltime <= self.last_query_clock {
674 return;
675 }
676
677 self.last_query_clock = ltime;
678 self.try_append(SnapshotRecord::QueryClock(ltime));
679 }
680
681 fn process_member_event(&mut self, e: &MemberEvent<T::Id, T::ResolvedAddress>) {
683 match e.ty {
684 MemberEventType::Join => {
685 for m in e.members() {
686 let node = m.node();
687 self.alive_nodes.insert(node.cheap_clone());
688 self.try_append(SnapshotRecord::Alive(Cow::Borrowed(node)))
689 }
690 }
691 MemberEventType::Leave | MemberEventType::Failed => {
692 for m in e.members() {
693 let node = m.node();
694 self.alive_nodes.remove(node);
695 self.try_append(SnapshotRecord::NotAlive(Cow::Borrowed(node)));
696 }
697 }
698 _ => {}
699 }
700 self.update_clock();
701 }
702
703 fn update_clock(&mut self) {
707 let t: u64 = self.clock.time().into();
708 let last_seen = LamportTime::from(t.saturating_sub(1));
709 if last_seen > self.last_clock {
710 self.last_clock = last_seen;
711 self.try_append(SnapshotRecord::Clock(self.last_clock));
712 }
713 }
714
715 fn try_append(&mut self, l: SnapshotRecord<'_, T::Id, T::ResolvedAddress>) {
716 if let Err(e) = self.append_line(l) {
717 tracing::error!(err = %e, "serf: failed to update snapshot");
718 if self.last_attempted_compaction.elapsed() > SNAPSHOT_ERROR_RECOVERY_INTERVAL {
719 self.last_attempted_compaction = Epoch::now();
720 tracing::info!("serf: attempting compaction to recover from error...");
721 if let Err(e) = self.compact() {
722 tracing::error!(err = %e, "serf: compaction failed, will reattempt after {}s", SNAPSHOT_ERROR_RECOVERY_INTERVAL.as_secs());
723 } else {
724 tracing::info!("serf: finished compaction, successfully recovered from error state");
725 }
726 }
727 }
728 }
729
730 fn append_line(
731 &mut self,
732 l: SnapshotRecord<'_, T::Id, T::ResolvedAddress>,
733 ) -> Result<(), SnapshotError> {
734 #[cfg(feature = "metrics")]
735 let start = crate::types::Epoch::now();
736
737 #[cfg(feature = "metrics")]
738 let metric_labels = self.metric_labels.clone();
739 #[cfg(feature = "metrics")]
740 scopeguard::defer!(
741 metrics::histogram!("serf.snapshot.append_line", metric_labels.iter())
742 .record(start.elapsed().as_millis() as f64)
743 );
744
745 let f = self.fh.as_mut().unwrap();
746 let n = l.encode(f).map_err(SnapshotError::Write)?;
747
748 if self.last_flush.elapsed() > FLUSH_INTERVAL {
750 self.last_flush = Epoch::now();
751 self
752 .fh
753 .as_mut()
754 .unwrap()
755 .flush()
756 .map_err(SnapshotError::Flush)?;
757 }
758
759 self.offset += n as u64;
761 if self.offset > self.snapshot_max_size() {
762 self.compact()?;
763 }
764 Ok(())
765 }
766
767 fn snapshot_max_size(&self) -> u64 {
769 let nodes = self.alive_nodes.len() as u64;
770 let est_size = nodes * SNAPSHOT_BYTES_PER_NODE as u64;
771 let threshold = est_size * SNAPSHOT_COMPACTION_THRESHOLD as u64;
772 threshold.max(self.min_compact_size)
773 }
774
775 fn compact(&mut self) -> Result<(), SnapshotError> {
777 #[cfg(feature = "metrics")]
778 let start = crate::types::Epoch::now();
779
780 #[cfg(feature = "metrics")]
781 let metric_labels = self.metric_labels.clone();
782 #[cfg(feature = "metrics")]
783 scopeguard::defer!(
784 metrics::histogram!("serf.snapshot.compact", metric_labels.iter())
785 .record(start.elapsed().as_millis() as f64)
786 );
787
788 let new_path = self.path.with_extension(TMP_EXT);
790 #[cfg(unix)]
791 let fh = OpenOptions::new()
792 .create(true)
793 .write(true)
794 .truncate(true)
795 .mode(0o755)
796 .open(&new_path)
797 .map_err(SnapshotError::OpenNew)?;
798
799 #[cfg(not(unix))]
800 let fh = OpenOptions::new()
801 .create(true)
802 .write(true)
803 .truncate(true)
804 .open(&new_path)
805 .map_err(SnapshotError::OpenNew)?;
806
807 let mut buf = BufWriter::new(fh);
809
810 let mut offset = 0u64;
812 for node in self.alive_nodes.iter() {
813 offset += SnapshotRecord::Alive(Cow::Borrowed(node))
814 .encode(&mut buf)
815 .map_err(SnapshotError::WriteNew)? as u64;
816 }
817
818 offset += SnapshotRecord::<T::Id, T::ResolvedAddress>::Clock(self.last_clock)
820 .encode(&mut buf)
821 .map_err(SnapshotError::WriteNew)? as u64;
822
823 offset += SnapshotRecord::<T::Id, T::ResolvedAddress>::EventClock(self.last_event_clock)
824 .encode(&mut buf)
825 .map_err(SnapshotError::WriteNew)? as u64;
826
827 offset += SnapshotRecord::<T::Id, T::ResolvedAddress>::QueryClock(self.last_query_clock)
828 .encode(&mut buf)
829 .map_err(SnapshotError::WriteNew)? as u64;
830
831 buf.flush().map_err(SnapshotError::Flush)?;
833
834 buf.get_ref().sync_all().map_err(SnapshotError::Sync)?;
836 drop(buf);
837
838 let mut old = self.fh.take().unwrap();
848 let _ = old.flush();
849 drop(old);
850
851 if let Err(e) = std::fs::remove_file(&self.path) {
853 if !matches!(e.kind(), std::io::ErrorKind::NotFound) {
854 return Err(SnapshotError::Remove(e));
855 }
856 }
857
858 std::fs::rename(&new_path, &self.path).map_err(SnapshotError::Install)?;
860
861 #[cfg(unix)]
863 let fh = OpenOptions::new()
864 .create(true)
865 .append(true)
866 .read(true)
867 .mode(0o755)
868 .open(&self.path)
869 .map_err(SnapshotError::Open)?;
870
871 #[cfg(not(unix))]
872 let fh = OpenOptions::new()
873 .create(true)
874 .append(true)
875 .read(true)
876 .write(true)
877 .open(&self.path)
878 .map_err(SnapshotError::Open)?;
879
880 self.fh = Some(BufWriter::new(fh));
881 self.offset = offset;
882 self.last_flush = Epoch::now();
883 Ok(())
884 }
885}