1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
use std::ffi::CStr;

use libc::{self, c_char, c_void};

use crate::{
    compaction_filter::{self, CompactionFilter},
    ffi,
};

/// Each compaction will create a new CompactionFilter allowing the
/// application to know about different compactions.
///
///  See [compaction_filter::CompactionFilter][CompactionFilter] and
///  [Options::set_compaction_filter_factory][set_compaction_filter_factory]
///  for more details
///
///  [CompactionFilter]: ../compaction_filter/trait.CompactionFilter.html
///  [set_compaction_filter_factory]: ../struct.Options.html#method.set_compaction_filter_factory
pub trait CompactionFilterFactory {
    type Filter: CompactionFilter;

    /// Returns a CompactionFilter for the compaction process
    fn create(&mut self, context: CompactionFilterContext) -> Self::Filter;

    /// Returns a name that identifies this compaction filter factory.
    fn name(&self) -> &CStr;
}

pub unsafe extern "C" fn destructor_callback<F>(raw_self: *mut c_void)
where
    F: CompactionFilterFactory,
{
    drop(Box::from_raw(raw_self as *mut F));
}

pub unsafe extern "C" fn name_callback<F>(raw_self: *mut c_void) -> *const c_char
where
    F: CompactionFilterFactory,
{
    let self_ = &*(raw_self as *const c_void as *const F);
    self_.name().as_ptr()
}

/// Context information of a compaction run
pub struct CompactionFilterContext {
    /// Does this compaction run include all data files
    pub is_full_compaction: bool,
    /// Is this compaction requested by the client (true),
    /// or is it occurring as an automatic compaction process
    pub is_manual_compaction: bool,
}

impl CompactionFilterContext {
    unsafe fn from_raw(ptr: *mut ffi::rocksdb_compactionfiltercontext_t) -> Self {
        let is_full_compaction = ffi::rocksdb_compactionfiltercontext_is_full_compaction(ptr) != 0;
        let is_manual_compaction =
            ffi::rocksdb_compactionfiltercontext_is_manual_compaction(ptr) != 0;

        Self {
            is_full_compaction,
            is_manual_compaction,
        }
    }
}

pub unsafe extern "C" fn create_compaction_filter_callback<F>(
    raw_self: *mut c_void,
    context: *mut ffi::rocksdb_compactionfiltercontext_t,
) -> *mut ffi::rocksdb_compactionfilter_t
where
    F: CompactionFilterFactory,
{
    let self_ = &mut *(raw_self as *mut F);
    let context = CompactionFilterContext::from_raw(context);
    let filter = Box::new(self_.create(context));

    let filter_ptr = Box::into_raw(filter);

    ffi::rocksdb_compactionfilter_create(
        filter_ptr as *mut c_void,
        Some(compaction_filter::destructor_callback::<F::Filter>),
        Some(compaction_filter::filter_callback::<F::Filter>),
        Some(compaction_filter::name_callback::<F::Filter>),
    )
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::compaction_filter::Decision;
    use crate::{Options, DB};
    use std::ffi::CString;

    struct CountFilter(u16, CString);
    impl CompactionFilter for CountFilter {
        fn filter(&mut self, _level: u32, _key: &[u8], _value: &[u8]) -> crate::CompactionDecision {
            self.0 += 1;
            if self.0 > 2 {
                Decision::Remove
            } else {
                Decision::Keep
            }
        }

        fn name(&self) -> &CStr {
            &self.1
        }
    }

    struct TestFactory(CString);
    impl CompactionFilterFactory for TestFactory {
        type Filter = CountFilter;

        fn create(&mut self, _context: CompactionFilterContext) -> Self::Filter {
            CountFilter(0, CString::new("CountFilter").unwrap())
        }

        fn name(&self) -> &CStr {
            &self.0
        }
    }

    #[test]
    fn compaction_filter_factory_test() {
        let path = "_rust_rocksdb_filter_factory_test";
        let mut opts = Options::default();
        opts.create_if_missing(true);
        opts.set_compaction_filter_factory(TestFactory(CString::new("TestFactory").unwrap()));
        {
            let db = DB::open(&opts, path).unwrap();
            let _r = db.put(b"k1", b"a");
            let _r = db.put(b"_rk", b"b");
            let _r = db.put(b"%k", b"c");
            db.compact_range(None::<&[u8]>, None::<&[u8]>);
            assert_eq!(db.get(b"%k1").unwrap(), None);
        }
        let result = DB::destroy(&opts, path);
        assert!(result.is_ok());
    }
}