1use std::collections::VecDeque;
4use std::collections::{HashMap, HashSet};
5use std::mem;
6use std::sync::atomic::{AtomicUsize, Ordering};
7use std::sync::Arc;
8
9use fail::fail_point;
10use log::{info, warn};
11use parking_lot::{Mutex, RwLock};
12
13use crate::config::Config;
14use crate::engine::read_entry_bytes_from_file;
15use crate::event_listener::EventListener;
16use crate::log_batch::{AtomicGroupBuilder, LogBatch};
17use crate::memtable::{MemTableHandle, MemTables};
18use crate::metrics::*;
19use crate::pipe_log::{FileBlockHandle, FileId, FileSeq, LogQueue, PipeLog};
20use crate::{GlobalStats, Result};
21
22const FORCE_COMPACT_RATIO: f64 = 0.2;
24const REWRITE_RATIO: f64 = 0.7;
26const MAX_REWRITE_ENTRIES_PER_REGION: usize = 32;
28const MAX_COUNT_BEFORE_FORCE_REWRITE: u32 = 9;
29
30fn max_batch_bytes() -> usize {
31 fail_point!("max_rewrite_batch_bytes", |s| s
32 .unwrap()
33 .parse::<usize>()
34 .unwrap());
35 128 * 1024
36}
37
38fn max_forcely_sync_bytes() -> usize {
39 max_batch_bytes() * 4
40}
41
42pub struct PurgeManager<P>
43where
44 P: PipeLog,
45{
46 cfg: Arc<Config>,
47 memtables: MemTables,
48 pipe_log: Arc<P>,
49 global_stats: Arc<GlobalStats>,
50 listeners: Vec<Arc<dyn EventListener>>,
51
52 force_rewrite_candidates: Arc<Mutex<HashMap<u64, u32>>>,
57}
58
59impl<P> PurgeManager<P>
60where
61 P: PipeLog,
62{
63 pub fn new(
64 cfg: Arc<Config>,
65 memtables: MemTables,
66 pipe_log: Arc<P>,
67 global_stats: Arc<GlobalStats>,
68 listeners: Vec<Arc<dyn EventListener>>,
69 ) -> PurgeManager<P> {
70 PurgeManager {
71 cfg,
72 memtables,
73 pipe_log,
74 global_stats,
75 listeners,
76 force_rewrite_candidates: Arc::new(Mutex::new(HashMap::default())),
77 }
78 }
79
80 pub fn purge_expired_files(&self) -> Result<Vec<u64>> {
81 let _t = StopWatch::new(&*ENGINE_PURGE_DURATION_HISTOGRAM);
82 let guard = self.force_rewrite_candidates.try_lock();
83 if guard.is_none() {
84 warn!("Unable to purge expired files: locked");
85 return Ok(vec![]);
86 }
87 let mut rewrite_candidate_regions = guard.unwrap();
88
89 let mut should_compact = HashSet::new();
90 if self.needs_rewrite_log_files(LogQueue::Rewrite) {
91 should_compact.extend(self.rewrite_rewrite_queue()?);
92 self.rescan_memtables_and_purge_stale_files(
93 LogQueue::Rewrite,
94 self.pipe_log.file_span(LogQueue::Rewrite).1,
95 )?;
96 }
97
98 if self.needs_rewrite_log_files(LogQueue::Append) {
99 if let (Some(rewrite_watermark), Some(compact_watermark)) =
100 self.append_queue_watermarks()
101 {
102 let (first_append, latest_append) = self.pipe_log.file_span(LogQueue::Append);
103 let append_queue_barrier =
104 self.listeners.iter().fold(latest_append, |barrier, l| {
105 l.first_file_not_ready_for_purge(LogQueue::Append)
106 .map_or(barrier, |f| std::cmp::min(f, barrier))
107 });
108
109 self.rewrite_append_queue_tombstones()?;
115 should_compact.extend(self.rewrite_or_compact_append_queue(
116 rewrite_watermark,
117 compact_watermark,
118 &mut rewrite_candidate_regions,
119 )?);
120
121 if append_queue_barrier == first_append && first_append < latest_append {
122 warn!("Unable to purge expired files: blocked by barrier");
123 }
124 self.rescan_memtables_and_purge_stale_files(
125 LogQueue::Append,
126 append_queue_barrier,
127 )?;
128 }
129 }
130 Ok(should_compact.into_iter().collect())
131 }
132
133 pub fn must_rewrite_append_queue(
136 &self,
137 watermark: Option<FileSeq>,
138 exit_after_step: Option<u64>,
139 ) {
140 let _lk = self.force_rewrite_candidates.try_lock().unwrap();
141 let (_, last) = self.pipe_log.file_span(LogQueue::Append);
142 let watermark = watermark.map_or(last, |w| std::cmp::min(w, last));
143 if watermark == last {
144 self.pipe_log.rotate(LogQueue::Append).unwrap();
145 }
146 self.rewrite_append_queue_tombstones().unwrap();
147 if exit_after_step == Some(1) {
148 return;
149 }
150 self.rewrite_memtables(self.memtables.collect(|_| true), 0, Some(watermark))
151 .unwrap();
152 if exit_after_step == Some(2) {
153 return;
154 }
155 self.rescan_memtables_and_purge_stale_files(
156 LogQueue::Append,
157 self.pipe_log.file_span(LogQueue::Append).1,
158 )
159 .unwrap();
160 }
161
162 pub fn must_rewrite_rewrite_queue(&self) {
163 let _lk = self.force_rewrite_candidates.try_lock().unwrap();
164 self.rewrite_rewrite_queue().unwrap();
165 self.rescan_memtables_and_purge_stale_files(
166 LogQueue::Rewrite,
167 self.pipe_log.file_span(LogQueue::Rewrite).1,
168 )
169 .unwrap();
170 }
171
172 pub fn must_purge_all_stale(&self) {
173 let _lk = self.force_rewrite_candidates.try_lock().unwrap();
174 self.pipe_log.rotate(LogQueue::Rewrite).unwrap();
175 self.rescan_memtables_and_purge_stale_files(
176 LogQueue::Rewrite,
177 self.pipe_log.file_span(LogQueue::Rewrite).1,
178 )
179 .unwrap();
180 self.pipe_log.rotate(LogQueue::Append).unwrap();
181 self.rescan_memtables_and_purge_stale_files(
182 LogQueue::Append,
183 self.pipe_log.file_span(LogQueue::Append).1,
184 )
185 .unwrap();
186 }
187
188 pub(crate) fn needs_rewrite_log_files(&self, queue: LogQueue) -> bool {
189 let (first_file, active_file) = self.pipe_log.file_span(queue);
190 if active_file == first_file {
191 return false;
192 }
193
194 let total_size = self.pipe_log.total_size(queue);
195 match queue {
196 LogQueue::Append => total_size > self.cfg.purge_threshold.0 as usize,
197 LogQueue::Rewrite => {
198 let compacted_rewrites_ratio = self.global_stats.deleted_rewrite_entries() as f64
199 / self.global_stats.rewrite_entries() as f64;
200 total_size > self.cfg.purge_rewrite_threshold.unwrap().0 as usize
201 && compacted_rewrites_ratio > self.cfg.purge_rewrite_garbage_ratio
202 }
203 }
204 }
205
206 fn append_queue_watermarks(&self) -> (Option<FileSeq>, Option<FileSeq>) {
210 let queue = LogQueue::Append;
211
212 let (first_file, active_file) = self.pipe_log.file_span(queue);
213 if active_file == first_file {
214 return (None, None);
216 }
217
218 let rewrite_watermark = self.pipe_log.file_at(queue, REWRITE_RATIO);
219 let compact_watermark = self.pipe_log.file_at(queue, FORCE_COMPACT_RATIO);
220 debug_assert!(active_file - 1 > 0);
221 (
222 Some(std::cmp::min(rewrite_watermark, active_file - 1)),
223 Some(std::cmp::min(compact_watermark, active_file - 1)),
224 )
225 }
226
227 fn rewrite_or_compact_append_queue(
228 &self,
229 rewrite_watermark: FileSeq,
230 compact_watermark: FileSeq,
231 rewrite_candidates: &mut HashMap<u64, u32>,
232 ) -> Result<Vec<u64>> {
233 let _t = StopWatch::new(&*ENGINE_REWRITE_APPEND_DURATION_HISTOGRAM);
234 debug_assert!(compact_watermark <= rewrite_watermark);
235 let mut should_compact = Vec::with_capacity(16);
236
237 let mut new_candidates = HashMap::with_capacity(rewrite_candidates.len());
238 let memtables = self.memtables.collect(|t| {
239 let min_append_seq = t.min_file_seq(LogQueue::Append).unwrap_or(u64::MAX);
240 let old = min_append_seq < compact_watermark || t.rewrite_count() > 0;
241 let has_something_to_rewrite = min_append_seq <= rewrite_watermark;
242 let append_heavy = t.has_at_least_some_entries_before(
243 FileId::new(LogQueue::Append, rewrite_watermark),
244 MAX_REWRITE_ENTRIES_PER_REGION + t.rewrite_count(),
245 );
246 let full_heavy = t.has_at_least_some_entries_before(
247 FileId::new(LogQueue::Append, rewrite_watermark),
248 MAX_REWRITE_ENTRIES_PER_REGION,
249 );
250 let compact_counter = rewrite_candidates.get(&t.region_id()).unwrap_or(&0);
252 if old && full_heavy {
253 if *compact_counter < MAX_COUNT_BEFORE_FORCE_REWRITE {
254 should_compact.push(t.region_id());
256 new_candidates.insert(t.region_id(), *compact_counter + 1);
257 return false;
258 } else {
259 should_compact.push(t.region_id());
261 return has_something_to_rewrite;
262 }
263 }
264 !append_heavy && has_something_to_rewrite
265 });
266
267 self.rewrite_memtables(
268 memtables,
269 MAX_REWRITE_ENTRIES_PER_REGION,
270 Some(rewrite_watermark),
271 )?;
272 *rewrite_candidates = new_candidates;
273
274 Ok(should_compact)
275 }
276
277 fn rewrite_rewrite_queue(&self) -> Result<Vec<u64>> {
279 let _t = StopWatch::new(&*ENGINE_REWRITE_REWRITE_DURATION_HISTOGRAM);
280 self.pipe_log.rotate(LogQueue::Rewrite)?;
281
282 let mut force_compact_regions = vec![];
283 let memtables = self.memtables.collect(|t| {
284 if t.rewrite_count() > MAX_REWRITE_ENTRIES_PER_REGION {
286 force_compact_regions.push(t.region_id());
287 }
288 t.min_file_seq(LogQueue::Rewrite).is_some()
289 });
290
291 self.rewrite_memtables(memtables, 0 , None)?;
292 self.global_stats.reset_rewrite_counters();
293 Ok(force_compact_regions)
294 }
295
296 fn rewrite_append_queue_tombstones(&self) -> Result<()> {
297 let mut log_batch = self.memtables.take_cleaned_region_logs();
298 self.rewrite_impl(
299 &mut log_batch,
300 None, true, )?;
303 Ok(())
304 }
305
306 fn rescan_memtables_and_purge_stale_files(&self, queue: LogQueue, seq: FileSeq) -> Result<()> {
308 let min_seq = self.memtables.fold(seq, |min, t| {
309 t.min_file_seq(queue).map_or(min, |m| std::cmp::min(min, m))
310 });
311
312 let purged = self.pipe_log.purge_to(FileId {
313 queue,
314 seq: min_seq,
315 })?;
316 if purged > 0 {
317 info!("purged {purged} expired log files for queue {queue:?}");
318 for listener in &self.listeners {
319 listener.post_purge(FileId {
320 queue,
321 seq: min_seq - 1,
322 });
323 }
324 }
325 Ok(())
326 }
327
328 fn rewrite_memtables(
329 &self,
330 memtables: Vec<MemTableHandle>,
331 expect_rewrites_per_memtable: usize,
332 rewrite: Option<FileSeq>,
333 ) -> Result<()> {
334 let needs_atomicity = (|| {
336 fail_point!("force_use_atomic_group", |_| true);
337 rewrite.is_none()
338 })();
339 let mut log_batch = LogBatch::default();
340 for memtable in memtables {
341 let mut entry_indexes = Vec::with_capacity(expect_rewrites_per_memtable);
342 let mut kvs = Vec::new();
343 let region_id = {
344 let m = memtable.read();
345 if let Some(rewrite) = rewrite {
346 m.fetch_entry_indexes_before(rewrite, &mut entry_indexes)?;
347 m.fetch_kvs_before(rewrite, &mut kvs);
348 } else {
349 m.fetch_rewritten_entry_indexes(&mut entry_indexes)?;
350 m.fetch_rewritten_kvs(&mut kvs);
351 }
352 m.region_id()
353 };
354
355 let mut previous_size = log_batch.approximate_size();
356 let mut atomic_group = None;
357 let mut atomic_group_start = None;
358 let mut current_entry_indexes = Vec::new();
359 let mut current_entries = Vec::new();
360 let mut current_size = 0;
361 let mut unsynced_size = 0;
362 let mut entry_indexes = entry_indexes.into_iter().peekable();
365 while let Some(ei) = entry_indexes.next() {
366 let entry = read_entry_bytes_from_file(self.pipe_log.as_ref(), &ei)?;
367 current_size += entry.len();
368 current_entries.push(entry);
369 current_entry_indexes.push(ei);
370 unsynced_size += current_size;
371 if entry_indexes.peek().is_some()
373 && current_size + previous_size > max_batch_bytes()
374 {
375 if needs_atomicity {
376 if previous_size > 0 {
377 self.rewrite_impl(&mut log_batch, rewrite, false)?;
381 previous_size = 0;
382 if current_size <= max_batch_bytes() {
383 continue;
384 }
385 }
386 match atomic_group.as_mut() {
387 None => {
388 let mut g = AtomicGroupBuilder::default();
389 g.begin(&mut log_batch);
390 atomic_group = Some(g);
391 }
392 Some(g) => {
393 g.add(&mut log_batch);
394 }
395 }
396 }
397
398 log_batch.add_raw_entries(
399 region_id,
400 mem::take(&mut current_entry_indexes),
401 mem::take(&mut current_entries),
402 )?;
403 current_size = 0;
404 previous_size = 0;
405 let sync = if unsynced_size >= max_forcely_sync_bytes() {
406 unsynced_size = 0;
409 true
410 } else {
411 false
412 };
413 let handle = self.rewrite_impl(&mut log_batch, rewrite, sync)?.unwrap();
414 if needs_atomicity && atomic_group_start.is_none() {
415 atomic_group_start = Some(handle.id.seq);
416 }
417 }
418 }
419 log_batch.add_raw_entries(region_id, current_entry_indexes, current_entries)?;
420 for (k, v) in kvs {
421 log_batch.put(region_id, k, v)?;
422 }
423 if let Some(g) = atomic_group.as_mut() {
424 g.end(&mut log_batch);
425 let handle = self.rewrite_impl(&mut log_batch, rewrite, false)?.unwrap();
426 self.memtables.apply_rewrite_atomic_group(
427 region_id,
428 atomic_group_start.unwrap(),
429 handle.id.seq,
430 );
431 } else if log_batch.approximate_size() > max_batch_bytes() {
432 self.rewrite_impl(&mut log_batch, rewrite, false)?;
433 }
434 }
435 self.rewrite_impl(&mut log_batch, rewrite, true)?;
436 Ok(())
437 }
438
439 fn rewrite_impl(
440 &self,
441 log_batch: &mut LogBatch,
442 rewrite_watermark: Option<FileSeq>,
443 sync: bool,
444 ) -> Result<Option<FileBlockHandle>> {
445 if log_batch.is_empty() {
446 debug_assert!(sync);
447 self.pipe_log.sync(LogQueue::Rewrite)?;
448 return Ok(None);
449 }
450 log_batch.finish_populate(
451 self.cfg.batch_compression_threshold.0 as usize,
452 self.cfg.compression_level,
453 )?;
454 let file_handle = self.pipe_log.append(LogQueue::Rewrite, log_batch)?;
455 if sync {
456 self.pipe_log.sync(LogQueue::Rewrite)?;
457 }
458 log_batch.finish_write(file_handle);
459 self.memtables.apply_rewrite_writes(
460 log_batch.drain(),
461 rewrite_watermark,
462 file_handle.id.seq,
463 );
464 for listener in &self.listeners {
465 listener.post_apply_memtables(file_handle.id);
466 }
467 if rewrite_watermark.is_none() {
468 BACKGROUND_REWRITE_BYTES
469 .rewrite
470 .observe(file_handle.len as f64);
471 } else {
472 BACKGROUND_REWRITE_BYTES
473 .append
474 .observe(file_handle.len as f64);
475 }
476 Ok(Some(file_handle))
477 }
478}
479
480#[derive(Default)]
481pub struct PurgeHook {
482 active_log_files: RwLock<VecDeque<(FileSeq, AtomicUsize)>>,
488}
489
490impl EventListener for PurgeHook {
491 fn post_new_log_file(&self, file_id: FileId) {
492 if file_id.queue == LogQueue::Append {
493 let mut active_log_files = self.active_log_files.write();
494 if let Some(seq) = active_log_files.back().map(|x| x.0) {
495 assert_eq!(
496 seq + 1,
497 file_id.seq,
498 "active log files should be contiguous"
499 );
500 }
501 let counter = AtomicUsize::new(0);
502 active_log_files.push_back((file_id.seq, counter));
503 }
504 }
505
506 fn on_append_log_file(&self, handle: FileBlockHandle) {
507 if handle.id.queue == LogQueue::Append {
508 let active_log_files = self.active_log_files.read();
509 assert!(!active_log_files.is_empty());
510 let front = active_log_files[0].0;
511 let counter = &active_log_files[(handle.id.seq - front) as usize].1;
512 counter.fetch_add(1, Ordering::Release);
513 }
514 }
515
516 fn post_apply_memtables(&self, file_id: FileId) {
517 if file_id.queue == LogQueue::Append {
518 let active_log_files = self.active_log_files.read();
519 assert!(!active_log_files.is_empty());
520 let front = active_log_files[0].0;
521 let counter = &active_log_files[(file_id.seq - front) as usize].1;
522 counter.fetch_sub(1, Ordering::Release);
523 }
524 }
525
526 fn first_file_not_ready_for_purge(&self, queue: LogQueue) -> Option<FileSeq> {
527 if queue == LogQueue::Append {
528 let active_log_files = self.active_log_files.read();
529 for (id, counter) in active_log_files.iter() {
530 if counter.load(Ordering::Acquire) > 0 {
531 return Some(*id);
532 }
533 }
534 }
535 None
536 }
537
538 fn post_purge(&self, file_id: FileId) {
539 if file_id.queue == LogQueue::Append {
540 let mut active_log_files = self.active_log_files.write();
541 assert!(!active_log_files.is_empty());
542 let front = active_log_files[0].0;
543 if front <= file_id.seq {
544 let mut purged = active_log_files.drain(0..=(file_id.seq - front) as usize);
545 assert_eq!(purged.next_back().unwrap().0, file_id.seq);
546 }
547 }
548 }
549}