value_log/gc/
mod.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
// Copyright (c) 2024-present, fjall-rs
// This source code is licensed under both the Apache 2.0 and MIT License
// (found in the LICENSE-* files in the repository)

pub mod report;

use crate::{id::SegmentId, Compressor, ValueLog};

/// GC strategy
#[allow(clippy::module_name_repetitions)]
pub trait GcStrategy<C: Compressor + Clone> {
    /// Picks segments based on a predicate.
    fn pick(&self, value_log: &ValueLog<C>) -> Vec<SegmentId>;
}

/// Picks segments that have a certain percentage of stale blobs
pub struct StaleThresholdStrategy(f32);

impl StaleThresholdStrategy {
    /// Creates a new strategy with the given threshold.
    ///
    /// # Panics
    ///
    /// Panics if the ratio is invalid.
    #[must_use]
    pub fn new(ratio: f32) -> Self {
        assert!(
            ratio.is_finite() && ratio.is_sign_positive(),
            "invalid stale ratio"
        );
        Self(ratio.min(1.0))
    }
}

impl<C: Compressor + Clone> GcStrategy<C> for StaleThresholdStrategy {
    fn pick(&self, value_log: &ValueLog<C>) -> Vec<SegmentId> {
        value_log
            .manifest
            .segments
            .read()
            .expect("lock is poisoned")
            .values()
            .filter(|x| x.stale_ratio() > self.0)
            .map(|x| x.id)
            .collect::<Vec<_>>()
    }
}

/// Tries to find a least-effort-selection of segments to merge to reach a certain space amplification
pub struct SpaceAmpStrategy(f32);

impl SpaceAmpStrategy {
    /// Creates a new strategy with the given space amp factor.
    ///
    /// # Panics
    ///
    /// Panics if the space amp factor is < 1.0.
    #[must_use]
    pub fn new(ratio: f32) -> Self {
        assert!(ratio >= 1.0, "invalid space amp ratio");
        Self(ratio)
    }
}

impl<C: Compressor + Clone> GcStrategy<C> for SpaceAmpStrategy {
    #[allow(clippy::cast_precision_loss, clippy::significant_drop_tightening)]
    fn pick(&self, value_log: &ValueLog<C>) -> Vec<SegmentId> {
        let space_amp_target = self.0;
        let current_space_amp = value_log.space_amp();

        if current_space_amp < space_amp_target {
            log::trace!("Space amp is <= target {space_amp_target}, nothing to do");
            vec![]
        } else {
            log::debug!("Selecting segments to GC, space_amp_target={space_amp_target}");

            let lock = value_log
                .manifest
                .segments
                .read()
                .expect("lock is poisoned");

            let mut segments = lock
                .values()
                .filter(|x| x.stale_ratio() > 0.0)
                .collect::<Vec<_>>();

            // Sort by stale ratio descending
            segments.sort_by(|a, b| {
                b.stale_ratio()
                    .partial_cmp(&a.stale_ratio())
                    .unwrap_or(std::cmp::Ordering::Equal)
            });

            let mut selection = vec![];

            let mut total_bytes = value_log.manifest.total_bytes();
            let mut stale_bytes = value_log.manifest.stale_bytes();

            for segment in segments {
                let segment_stale_bytes = segment.gc_stats.stale_bytes();
                stale_bytes -= segment_stale_bytes;
                total_bytes -= segment_stale_bytes;

                selection.push(segment.id);

                let space_amp_after_gc =
                    total_bytes as f32 / (total_bytes as f32 - stale_bytes as f32);

                log::debug!(
                    "Selected segment #{} for GC: will reduce space amp to {space_amp_after_gc}",
                    segment.id
                );

                if space_amp_after_gc <= space_amp_target {
                    break;
                }
            }

            selection
        }
    }
}