raft_engine/
purge.rs

1// Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0.
2
3use std::collections::VecDeque;
4use std::collections::{HashMap, HashSet};
5use std::mem;
6use std::sync::atomic::{AtomicUsize, Ordering};
7use std::sync::Arc;
8
9use fail::fail_point;
10use log::{info, warn};
11use parking_lot::{Mutex, RwLock};
12
13use crate::config::Config;
14use crate::engine::read_entry_bytes_from_file;
15use crate::event_listener::EventListener;
16use crate::log_batch::{AtomicGroupBuilder, LogBatch};
17use crate::memtable::{MemTableHandle, MemTables};
18use crate::metrics::*;
19use crate::pipe_log::{FileBlockHandle, FileId, FileSeq, LogQueue, PipeLog};
20use crate::{GlobalStats, Result};
21
22// Force compact region with oldest 20% logs.
23const FORCE_COMPACT_RATIO: f64 = 0.2;
24// Only rewrite region with oldest 70% logs.
25const REWRITE_RATIO: f64 = 0.7;
26// Only rewrite region with stale logs less than this threshold.
27const MAX_REWRITE_ENTRIES_PER_REGION: usize = 32;
28const MAX_COUNT_BEFORE_FORCE_REWRITE: u32 = 9;
29
30fn max_batch_bytes() -> usize {
31    fail_point!("max_rewrite_batch_bytes", |s| s
32        .unwrap()
33        .parse::<usize>()
34        .unwrap());
35    128 * 1024
36}
37
38fn max_forcely_sync_bytes() -> usize {
39    max_batch_bytes() * 4
40}
41
42pub struct PurgeManager<P>
43where
44    P: PipeLog,
45{
46    cfg: Arc<Config>,
47    memtables: MemTables,
48    pipe_log: Arc<P>,
49    global_stats: Arc<GlobalStats>,
50    listeners: Vec<Arc<dyn EventListener>>,
51
52    // Only one thread can run `purge_expired_files` at a time.
53    //
54    // This table records Raft Groups that should be force compacted before. Those that are not
55    // compacted in time (after `MAX_EPOCH_BEFORE_FORCE_REWRITE` epochs) will be force rewritten.
56    force_rewrite_candidates: Arc<Mutex<HashMap<u64, u32>>>,
57}
58
59impl<P> PurgeManager<P>
60where
61    P: PipeLog,
62{
63    pub fn new(
64        cfg: Arc<Config>,
65        memtables: MemTables,
66        pipe_log: Arc<P>,
67        global_stats: Arc<GlobalStats>,
68        listeners: Vec<Arc<dyn EventListener>>,
69    ) -> PurgeManager<P> {
70        PurgeManager {
71            cfg,
72            memtables,
73            pipe_log,
74            global_stats,
75            listeners,
76            force_rewrite_candidates: Arc::new(Mutex::new(HashMap::default())),
77        }
78    }
79
80    pub fn purge_expired_files(&self) -> Result<Vec<u64>> {
81        let _t = StopWatch::new(&*ENGINE_PURGE_DURATION_HISTOGRAM);
82        let guard = self.force_rewrite_candidates.try_lock();
83        if guard.is_none() {
84            warn!("Unable to purge expired files: locked");
85            return Ok(vec![]);
86        }
87        let mut rewrite_candidate_regions = guard.unwrap();
88
89        let mut should_compact = HashSet::new();
90        if self.needs_rewrite_log_files(LogQueue::Rewrite) {
91            should_compact.extend(self.rewrite_rewrite_queue()?);
92            self.rescan_memtables_and_purge_stale_files(
93                LogQueue::Rewrite,
94                self.pipe_log.file_span(LogQueue::Rewrite).1,
95            )?;
96        }
97
98        if self.needs_rewrite_log_files(LogQueue::Append) {
99            if let (Some(rewrite_watermark), Some(compact_watermark)) =
100                self.append_queue_watermarks()
101            {
102                let (first_append, latest_append) = self.pipe_log.file_span(LogQueue::Append);
103                let append_queue_barrier =
104                    self.listeners.iter().fold(latest_append, |barrier, l| {
105                        l.first_file_not_ready_for_purge(LogQueue::Append)
106                            .map_or(barrier, |f| std::cmp::min(f, barrier))
107                    });
108
109                // Ordering
110                // 1. Must rewrite tombstones AFTER acquiring `append_queue_barrier`, or
111                //    deletion marks might be lost after restart.
112                // 2. Must rewrite tombstones BEFORE rewrite entries, or entries from recreated
113                //    region might be lost after restart.
114                self.rewrite_append_queue_tombstones()?;
115                should_compact.extend(self.rewrite_or_compact_append_queue(
116                    rewrite_watermark,
117                    compact_watermark,
118                    &mut rewrite_candidate_regions,
119                )?);
120
121                if append_queue_barrier == first_append && first_append < latest_append {
122                    warn!("Unable to purge expired files: blocked by barrier");
123                }
124                self.rescan_memtables_and_purge_stale_files(
125                    LogQueue::Append,
126                    append_queue_barrier,
127                )?;
128            }
129        }
130        Ok(should_compact.into_iter().collect())
131    }
132
133    /// Rewrite append files with seqno no larger than `watermark`. When it's
134    /// None, rewrite the entire queue. Returns the number of purged files.
135    pub fn must_rewrite_append_queue(
136        &self,
137        watermark: Option<FileSeq>,
138        exit_after_step: Option<u64>,
139    ) {
140        let _lk = self.force_rewrite_candidates.try_lock().unwrap();
141        let (_, last) = self.pipe_log.file_span(LogQueue::Append);
142        let watermark = watermark.map_or(last, |w| std::cmp::min(w, last));
143        if watermark == last {
144            self.pipe_log.rotate(LogQueue::Append).unwrap();
145        }
146        self.rewrite_append_queue_tombstones().unwrap();
147        if exit_after_step == Some(1) {
148            return;
149        }
150        self.rewrite_memtables(self.memtables.collect(|_| true), 0, Some(watermark))
151            .unwrap();
152        if exit_after_step == Some(2) {
153            return;
154        }
155        self.rescan_memtables_and_purge_stale_files(
156            LogQueue::Append,
157            self.pipe_log.file_span(LogQueue::Append).1,
158        )
159        .unwrap();
160    }
161
162    pub fn must_rewrite_rewrite_queue(&self) {
163        let _lk = self.force_rewrite_candidates.try_lock().unwrap();
164        self.rewrite_rewrite_queue().unwrap();
165        self.rescan_memtables_and_purge_stale_files(
166            LogQueue::Rewrite,
167            self.pipe_log.file_span(LogQueue::Rewrite).1,
168        )
169        .unwrap();
170    }
171
172    pub fn must_purge_all_stale(&self) {
173        let _lk = self.force_rewrite_candidates.try_lock().unwrap();
174        self.pipe_log.rotate(LogQueue::Rewrite).unwrap();
175        self.rescan_memtables_and_purge_stale_files(
176            LogQueue::Rewrite,
177            self.pipe_log.file_span(LogQueue::Rewrite).1,
178        )
179        .unwrap();
180        self.pipe_log.rotate(LogQueue::Append).unwrap();
181        self.rescan_memtables_and_purge_stale_files(
182            LogQueue::Append,
183            self.pipe_log.file_span(LogQueue::Append).1,
184        )
185        .unwrap();
186    }
187
188    pub(crate) fn needs_rewrite_log_files(&self, queue: LogQueue) -> bool {
189        let (first_file, active_file) = self.pipe_log.file_span(queue);
190        if active_file == first_file {
191            return false;
192        }
193
194        let total_size = self.pipe_log.total_size(queue);
195        match queue {
196            LogQueue::Append => total_size > self.cfg.purge_threshold.0 as usize,
197            LogQueue::Rewrite => {
198                let compacted_rewrites_ratio = self.global_stats.deleted_rewrite_entries() as f64
199                    / self.global_stats.rewrite_entries() as f64;
200                total_size > self.cfg.purge_rewrite_threshold.unwrap().0 as usize
201                    && compacted_rewrites_ratio > self.cfg.purge_rewrite_garbage_ratio
202            }
203        }
204    }
205
206    // Returns (rewrite_watermark, compact_watermark).
207    // Files older than compact_watermark should be compacted;
208    // Files between compact_watermark and rewrite_watermark should be rewritten.
209    fn append_queue_watermarks(&self) -> (Option<FileSeq>, Option<FileSeq>) {
210        let queue = LogQueue::Append;
211
212        let (first_file, active_file) = self.pipe_log.file_span(queue);
213        if active_file == first_file {
214            // Can't rewrite or force compact the active file.
215            return (None, None);
216        }
217
218        let rewrite_watermark = self.pipe_log.file_at(queue, REWRITE_RATIO);
219        let compact_watermark = self.pipe_log.file_at(queue, FORCE_COMPACT_RATIO);
220        debug_assert!(active_file - 1 > 0);
221        (
222            Some(std::cmp::min(rewrite_watermark, active_file - 1)),
223            Some(std::cmp::min(compact_watermark, active_file - 1)),
224        )
225    }
226
227    fn rewrite_or_compact_append_queue(
228        &self,
229        rewrite_watermark: FileSeq,
230        compact_watermark: FileSeq,
231        rewrite_candidates: &mut HashMap<u64, u32>,
232    ) -> Result<Vec<u64>> {
233        let _t = StopWatch::new(&*ENGINE_REWRITE_APPEND_DURATION_HISTOGRAM);
234        debug_assert!(compact_watermark <= rewrite_watermark);
235        let mut should_compact = Vec::with_capacity(16);
236
237        let mut new_candidates = HashMap::with_capacity(rewrite_candidates.len());
238        let memtables = self.memtables.collect(|t| {
239            let min_append_seq = t.min_file_seq(LogQueue::Append).unwrap_or(u64::MAX);
240            let old = min_append_seq < compact_watermark || t.rewrite_count() > 0;
241            let has_something_to_rewrite = min_append_seq <= rewrite_watermark;
242            let append_heavy = t.has_at_least_some_entries_before(
243                FileId::new(LogQueue::Append, rewrite_watermark),
244                MAX_REWRITE_ENTRIES_PER_REGION + t.rewrite_count(),
245            );
246            let full_heavy = t.has_at_least_some_entries_before(
247                FileId::new(LogQueue::Append, rewrite_watermark),
248                MAX_REWRITE_ENTRIES_PER_REGION,
249            );
250            // counter is the times that target region triggers force compact.
251            let compact_counter = rewrite_candidates.get(&t.region_id()).unwrap_or(&0);
252            if old && full_heavy {
253                if *compact_counter < MAX_COUNT_BEFORE_FORCE_REWRITE {
254                    // repeatedly ask user to compact these heavy regions.
255                    should_compact.push(t.region_id());
256                    new_candidates.insert(t.region_id(), *compact_counter + 1);
257                    return false;
258                } else {
259                    // user is not responsive, do the rewrite ourselves.
260                    should_compact.push(t.region_id());
261                    return has_something_to_rewrite;
262                }
263            }
264            !append_heavy && has_something_to_rewrite
265        });
266
267        self.rewrite_memtables(
268            memtables,
269            MAX_REWRITE_ENTRIES_PER_REGION,
270            Some(rewrite_watermark),
271        )?;
272        *rewrite_candidates = new_candidates;
273
274        Ok(should_compact)
275    }
276
277    // Rewrites the entire rewrite queue into new log files.
278    fn rewrite_rewrite_queue(&self) -> Result<Vec<u64>> {
279        let _t = StopWatch::new(&*ENGINE_REWRITE_REWRITE_DURATION_HISTOGRAM);
280        self.pipe_log.rotate(LogQueue::Rewrite)?;
281
282        let mut force_compact_regions = vec![];
283        let memtables = self.memtables.collect(|t| {
284            // if the region is force rewritten, we should also trigger compact.
285            if t.rewrite_count() > MAX_REWRITE_ENTRIES_PER_REGION {
286                force_compact_regions.push(t.region_id());
287            }
288            t.min_file_seq(LogQueue::Rewrite).is_some()
289        });
290
291        self.rewrite_memtables(memtables, 0 /* expect_rewrites_per_memtable */, None)?;
292        self.global_stats.reset_rewrite_counters();
293        Ok(force_compact_regions)
294    }
295
296    fn rewrite_append_queue_tombstones(&self) -> Result<()> {
297        let mut log_batch = self.memtables.take_cleaned_region_logs();
298        self.rewrite_impl(
299            &mut log_batch,
300            None, /* rewrite_watermark */
301            true, /* sync */
302        )?;
303        Ok(())
304    }
305
306    // Exclusive.
307    fn rescan_memtables_and_purge_stale_files(&self, queue: LogQueue, seq: FileSeq) -> Result<()> {
308        let min_seq = self.memtables.fold(seq, |min, t| {
309            t.min_file_seq(queue).map_or(min, |m| std::cmp::min(min, m))
310        });
311
312        let purged = self.pipe_log.purge_to(FileId {
313            queue,
314            seq: min_seq,
315        })?;
316        if purged > 0 {
317            info!("purged {purged} expired log files for queue {queue:?}");
318            for listener in &self.listeners {
319                listener.post_purge(FileId {
320                    queue,
321                    seq: min_seq - 1,
322                });
323            }
324        }
325        Ok(())
326    }
327
328    fn rewrite_memtables(
329        &self,
330        memtables: Vec<MemTableHandle>,
331        expect_rewrites_per_memtable: usize,
332        rewrite: Option<FileSeq>,
333    ) -> Result<()> {
334        // Only use atomic group for rewrite-rewrite operation.
335        let needs_atomicity = (|| {
336            fail_point!("force_use_atomic_group", |_| true);
337            rewrite.is_none()
338        })();
339        let mut log_batch = LogBatch::default();
340        for memtable in memtables {
341            let mut entry_indexes = Vec::with_capacity(expect_rewrites_per_memtable);
342            let mut kvs = Vec::new();
343            let region_id = {
344                let m = memtable.read();
345                if let Some(rewrite) = rewrite {
346                    m.fetch_entry_indexes_before(rewrite, &mut entry_indexes)?;
347                    m.fetch_kvs_before(rewrite, &mut kvs);
348                } else {
349                    m.fetch_rewritten_entry_indexes(&mut entry_indexes)?;
350                    m.fetch_rewritten_kvs(&mut kvs);
351                }
352                m.region_id()
353            };
354
355            let mut previous_size = log_batch.approximate_size();
356            let mut atomic_group = None;
357            let mut atomic_group_start = None;
358            let mut current_entry_indexes = Vec::new();
359            let mut current_entries = Vec::new();
360            let mut current_size = 0;
361            let mut unsynced_size = 0;
362            // Split the entries into smaller chunks, so that we don't OOM, and the
363            // compression overhead is not too high.
364            let mut entry_indexes = entry_indexes.into_iter().peekable();
365            while let Some(ei) = entry_indexes.next() {
366                let entry = read_entry_bytes_from_file(self.pipe_log.as_ref(), &ei)?;
367                current_size += entry.len();
368                current_entries.push(entry);
369                current_entry_indexes.push(ei);
370                unsynced_size += current_size;
371                // If this is the last entry, we handle them outside the loop.
372                if entry_indexes.peek().is_some()
373                    && current_size + previous_size > max_batch_bytes()
374                {
375                    if needs_atomicity {
376                        if previous_size > 0 {
377                            // We are certain that prev raft group and current raft group cannot fit
378                            // inside one batch.
379                            // To avoid breaking atomicity, we need to flush.
380                            self.rewrite_impl(&mut log_batch, rewrite, false)?;
381                            previous_size = 0;
382                            if current_size <= max_batch_bytes() {
383                                continue;
384                            }
385                        }
386                        match atomic_group.as_mut() {
387                            None => {
388                                let mut g = AtomicGroupBuilder::default();
389                                g.begin(&mut log_batch);
390                                atomic_group = Some(g);
391                            }
392                            Some(g) => {
393                                g.add(&mut log_batch);
394                            }
395                        }
396                    }
397
398                    log_batch.add_raw_entries(
399                        region_id,
400                        mem::take(&mut current_entry_indexes),
401                        mem::take(&mut current_entries),
402                    )?;
403                    current_size = 0;
404                    previous_size = 0;
405                    let sync = if unsynced_size >= max_forcely_sync_bytes() {
406                        // Avoiding too many unsynced size can make the later `fdatasync` in
407                        // the append progress blocked for too long.
408                        unsynced_size = 0;
409                        true
410                    } else {
411                        false
412                    };
413                    let handle = self.rewrite_impl(&mut log_batch, rewrite, sync)?.unwrap();
414                    if needs_atomicity && atomic_group_start.is_none() {
415                        atomic_group_start = Some(handle.id.seq);
416                    }
417                }
418            }
419            log_batch.add_raw_entries(region_id, current_entry_indexes, current_entries)?;
420            for (k, v) in kvs {
421                log_batch.put(region_id, k, v)?;
422            }
423            if let Some(g) = atomic_group.as_mut() {
424                g.end(&mut log_batch);
425                let handle = self.rewrite_impl(&mut log_batch, rewrite, false)?.unwrap();
426                self.memtables.apply_rewrite_atomic_group(
427                    region_id,
428                    atomic_group_start.unwrap(),
429                    handle.id.seq,
430                );
431            } else if log_batch.approximate_size() > max_batch_bytes() {
432                self.rewrite_impl(&mut log_batch, rewrite, false)?;
433            }
434        }
435        self.rewrite_impl(&mut log_batch, rewrite, true)?;
436        Ok(())
437    }
438
439    fn rewrite_impl(
440        &self,
441        log_batch: &mut LogBatch,
442        rewrite_watermark: Option<FileSeq>,
443        sync: bool,
444    ) -> Result<Option<FileBlockHandle>> {
445        if log_batch.is_empty() {
446            debug_assert!(sync);
447            self.pipe_log.sync(LogQueue::Rewrite)?;
448            return Ok(None);
449        }
450        log_batch.finish_populate(
451            self.cfg.batch_compression_threshold.0 as usize,
452            self.cfg.compression_level,
453        )?;
454        let file_handle = self.pipe_log.append(LogQueue::Rewrite, log_batch)?;
455        if sync {
456            self.pipe_log.sync(LogQueue::Rewrite)?;
457        }
458        log_batch.finish_write(file_handle);
459        self.memtables.apply_rewrite_writes(
460            log_batch.drain(),
461            rewrite_watermark,
462            file_handle.id.seq,
463        );
464        for listener in &self.listeners {
465            listener.post_apply_memtables(file_handle.id);
466        }
467        if rewrite_watermark.is_none() {
468            BACKGROUND_REWRITE_BYTES
469                .rewrite
470                .observe(file_handle.len as f64);
471        } else {
472            BACKGROUND_REWRITE_BYTES
473                .append
474                .observe(file_handle.len as f64);
475        }
476        Ok(Some(file_handle))
477    }
478}
479
480#[derive(Default)]
481pub struct PurgeHook {
482    // Append queue log files that are not yet fully applied to MemTable must not be
483    // purged even when not referenced by any MemTable.
484    // In order to identify them, maintain a per-file reference counter for all active
485    // log files in append queue. No need to track rewrite queue because it is only
486    // written by purge thread.
487    active_log_files: RwLock<VecDeque<(FileSeq, AtomicUsize)>>,
488}
489
490impl EventListener for PurgeHook {
491    fn post_new_log_file(&self, file_id: FileId) {
492        if file_id.queue == LogQueue::Append {
493            let mut active_log_files = self.active_log_files.write();
494            if let Some(seq) = active_log_files.back().map(|x| x.0) {
495                assert_eq!(
496                    seq + 1,
497                    file_id.seq,
498                    "active log files should be contiguous"
499                );
500            }
501            let counter = AtomicUsize::new(0);
502            active_log_files.push_back((file_id.seq, counter));
503        }
504    }
505
506    fn on_append_log_file(&self, handle: FileBlockHandle) {
507        if handle.id.queue == LogQueue::Append {
508            let active_log_files = self.active_log_files.read();
509            assert!(!active_log_files.is_empty());
510            let front = active_log_files[0].0;
511            let counter = &active_log_files[(handle.id.seq - front) as usize].1;
512            counter.fetch_add(1, Ordering::Release);
513        }
514    }
515
516    fn post_apply_memtables(&self, file_id: FileId) {
517        if file_id.queue == LogQueue::Append {
518            let active_log_files = self.active_log_files.read();
519            assert!(!active_log_files.is_empty());
520            let front = active_log_files[0].0;
521            let counter = &active_log_files[(file_id.seq - front) as usize].1;
522            counter.fetch_sub(1, Ordering::Release);
523        }
524    }
525
526    fn first_file_not_ready_for_purge(&self, queue: LogQueue) -> Option<FileSeq> {
527        if queue == LogQueue::Append {
528            let active_log_files = self.active_log_files.read();
529            for (id, counter) in active_log_files.iter() {
530                if counter.load(Ordering::Acquire) > 0 {
531                    return Some(*id);
532                }
533            }
534        }
535        None
536    }
537
538    fn post_purge(&self, file_id: FileId) {
539        if file_id.queue == LogQueue::Append {
540            let mut active_log_files = self.active_log_files.write();
541            assert!(!active_log_files.is_empty());
542            let front = active_log_files[0].0;
543            if front <= file_id.seq {
544                let mut purged = active_log_files.drain(0..=(file_id.seq - front) as usize);
545                assert_eq!(purged.next_back().unwrap().0, file_id.seq);
546            }
547        }
548    }
549}