1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
//! `CompactionFilter` allows an application to modify/delete a key-value at
//! the time of compaction.

use std::os::raw::{c_char, c_int};

use rocks_sys as ll;

#[repr(C)]
#[derive(Debug, PartialEq, PartialOrd, Eq, Ord, Copy, Clone)]
pub enum ValueType {
    Value = 0,
    MergeOperand = 1,
}

#[derive(Debug)]
pub enum Decision {
    Keep,
    Remove,
    ChangeValue(Vec<u8>),
    RemoveAndSkipUntil(Vec<u8>),
}

impl Decision {
    // to C Decision type
    fn to_c(&self) -> c_int {
        match *self {
            Decision::Keep => 0,
            Decision::Remove => 1,
            Decision::ChangeValue(_) => 2,
            Decision::RemoveAndSkipUntil(_) => 3,
        }
    }
}

/// `CompactionFilter` allows an application to modify/delete a key-value at
/// the time of compaction.
pub trait CompactionFilter {
    // The compaction process invokes this
    // method for kv that is being compacted. A return value
    // of false indicates that the kv should be preserved in the
    // output of this compaction run and a return value of true
    // indicates that this key-value should be removed from the
    // output of the compaction.  The application can inspect
    // the existing value of the key and make decision based on it.
    //
    // Key-Values that are results of merge operation during compaction are not
    // passed into this function. Currently, when you have a mix of Put()s and
    // Merge()s on a same key, we only guarantee to process the merge operands
    // through the compaction filters. Put()s might be processed, or might not.
    //
    // When the value is to be preserved, the application has the option
    // to modify the existing_value and pass it back through new_value.
    // value_changed needs to be set to true in this case.
    //
    // If you use snapshot feature of RocksDB (i.e. call GetSnapshot() API on a
    // DB* object), CompactionFilter might not be very useful for you. Due to
    // guarantees we need to maintain, compaction process will not call Filter()
    // on any keys that were written before the latest snapshot. In other words,
    // compaction will only call Filter() on keys written after your most recent
    // call to GetSnapshot(). In most cases, Filter() will not be called very
    // often. This is something we're fixing. See the discussion at:
    // https://www.facebook.com/groups/mysqlonrocksdb/permalink/999723240091865/
    //
    // If multithreaded compaction is being used *and* a single CompactionFilter
    // instance was supplied via Options::compaction_filter, this method may be
    // called from different threads concurrently.  The application must ensure
    // that the call is thread-safe.
    //
    // If the CompactionFilter was created by a factory, then it will only ever
    // be used by a single thread that is doing the compaction run, and this
    // call does not need to be thread-safe.  However, multiple filters may be
    // in existence and operating concurrently.
    //
    // The last paragraph is not true if you set max_subcompactions to more than
    // 1. In that case, subcompaction from multiple threads may call a single
    // CompactionFilter concurrently.
    //
    // For rust:
    // - None: false, indicates that the kv should be preserved in the output of this compaction run.
    // - Some(None): true, indicates that this key-value should be removed from the output of the
    //   compaction.
    // - Some(Some(vec![])): modify the existing_value and pass it back through new_value.
    // fn filter(&self, level: u32, key: &[u8], existing_value: &[u8]) -> Option<Option<Vec<u8>>> {
    // None
    // }
    //
    // The compaction process invokes this method on every merge operand. If this
    // method returns true, the merge operand will be ignored and not written out
    // in the compaction output
    //
    // Note: If you are using a TransactionDB, it is not recommended to implement
    // FilterMergeOperand().  If a Merge operation is filtered out, TransactionDB
    // may not realize there is a write conflict and may allow a Transaction to
    // Commit that should have failed.  Instead, it is better to implement any
    // Merge filtering inside the MergeOperator.
    // fn filter_merge_operand(&self, level: u32, key: &[u8], operand: &[u8]) -> bool {
    // false
    // }
    //
    /// An extended API. Called for both values and merge operands.
    /// Allows changing value and skipping ranges of keys.
    /// The default implementation uses Filter() and FilterMergeOperand().
    /// If you're overriding this method, no need to override the other two.
    /// `value_type` indicates whether this key-value corresponds to a normal
    /// value (e.g. written with Put())  or a merge operand (written with Merge()).
    ///
    /// Possible return values:
    ///  * kKeep - keep the key-value pair.
    ///  * kRemove - remove the key-value pair or merge operand.
    ///  * kChangeValue - keep the key and change the value/operand to *new_value.
    ///  * kRemoveAndSkipUntil - remove this key-value pair, and also remove all key-value pairs
    ///    with key in [key, *skip_until). This range of keys will be skipped without reading,
    ///    potentially saving some IO operations compared to removing the keys one by one.
    ///
    ///    *skip_until <= key is treated the same as Decision::kKeep
    ///    (since the range [key, *skip_until) is empty).
    ///
    ///    The keys are skipped even if there are snapshots containing them,
    ///    as if IgnoreSnapshots() was true; i.e. values removed
    ///    by kRemoveAndSkipUntil can disappear from a snapshot - beware
    ///    if you're using TransactionDB or DB::GetSnapshot().
    ///
    ///    Another warning: if value for a key was overwritten or merged into
    ///    (multiple Put()s or Merge()s), and compaction filter skips this key
    ///    with kRemoveAndSkipUntil, it's possible that it will remove only
    ///    the new value, exposing the old value that was supposed to be
    ///    overwritten.
    ///
    ///    If you use kRemoveAndSkipUntil, consider also reducing
    ///    compaction_readahead_size option.
    ///
    /// Note: If you are using a TransactionDB, it is not recommended to filter
    /// out or modify merge operands (ValueType::kMergeOperand).
    /// If a merge operation is filtered out, TransactionDB may not realize there
    /// is a write conflict and may allow a Transaction to Commit that should have
    /// failed. Instead, it is better to implement any Merge filtering inside the
    /// MergeOperator.
    ///
    /// Rust:
    ///   Decision for detailed return type.
    fn filter(&mut self, level: i32, key: &[u8], value_type: ValueType, existing_value: &[u8]) -> Decision {
        Decision::Keep
    }

    /// This function is deprecated. Snapshots will always be ignored for
    /// compaction filters, because we realized that not ignoring snapshots doesn't
    /// provide the gurantee we initially thought it would provide. Repeatable
    /// reads will not be guaranteed anyway. If you override the function and
    /// returns false, we will fail the compaction.
    fn ignore_snapshots(&self) -> bool {
        true
    }

    /// Returns a name that identifies this compaction filter.
    /// The name will be printed to LOG file on start up for diagnosis.
    fn name(&self) -> &str {
        "RustCompactionFilterV2\0"
    }
}

/// Each compaction will create a new `CompactionFilter` allowing the
/// application to know about different compactions
pub trait CompactionFilterFactory {
    fn create_compaction_filter(&self, context: &Context) -> Box<dyn CompactionFilter>;

    /// Returns a name that identifies this compaction filter factory.
    fn name(&self) -> &str {
        "RustCompactionFilterFactory\0"
    }
}

/// Context information of a compaction run
#[repr(C)]
pub struct Context {
    /// Does this compaction run include all data files
    pub is_full_compaction: bool,
    /// Is this compaction requested by the client (true),
    /// or is it occurring as an automatic compaction process
    pub is_manual_compaction: bool,
    /// Which column family this compaction is for.
    pub column_family_id: u32,
}

// call rust fn in C
#[doc(hidden)]
pub mod c {
    use super::*;

    #[no_mangle]
    #[allow(mutable_transmutes)]
    pub unsafe extern "C" fn rust_compaction_filter_call(
        f: *mut (),
        level: c_int,
        key: &&[u8], // *Slice
        value_type: ValueType,
        existing_value: &&[u8], // *Slice
        new_value: *mut (),     // *std::string
        skip_until: *mut (),
    ) -> c_int {
        assert!(!f.is_null());
        // FIXME: borrow as mutable
        let filter = f as *mut &mut (dyn CompactionFilter + Sync);
        // must be the same as C part
        match (*filter).filter(level, key, value_type, existing_value) {
            Decision::Keep => 0,
            Decision::Remove => 1,
            Decision::ChangeValue(nval) => {
                ll::cxx_string_assign(new_value as *mut _, nval.as_ptr() as *const _, nval.len());
                2
            },
            Decision::RemoveAndSkipUntil(skip) => {
                ll::cxx_string_assign(skip_until as *mut _, skip.as_ptr() as *const _, skip.len());
                3
            },
        }
    }

    #[no_mangle]
    pub unsafe extern "C" fn rust_compaction_filter_drop(f: *mut ()) {
        assert!(!f.is_null());
        let filter = f as *mut &(dyn CompactionFilter + Sync);
        Box::from_raw(filter);
    }

    #[no_mangle]
    pub unsafe extern "C" fn rust_compaction_filter_name(f: *mut ()) -> *const c_char {
        assert!(!f.is_null());
        let filter = f as *mut &(dyn CompactionFilter + Sync);
        (*filter).name().as_ptr() as _
    }

    #[no_mangle]
    pub unsafe extern "C" fn rust_compaction_filter_ignore_snapshots(f: *mut ()) -> c_char {
        assert!(!f.is_null());
        let filter = f as *mut &(dyn CompactionFilter + Sync);
        (*filter).ignore_snapshots() as _
    }
}

#[cfg(test)]
mod tests {
    use crate::rocksdb::*;
    use super::*;
    use lazy_static::lazy_static;

    pub struct MyCompactionFilter;

    impl CompactionFilter for MyCompactionFilter {
        fn filter(&mut self, level: i32, key: &[u8], value_type: ValueType, existing_value: &[u8]) -> Decision {
            assert_eq!(value_type, ValueType::Value); // haven't set up merge test

            if existing_value == b"TO-BE-DELETED" {
                Decision::Remove
            } else if existing_value == b"an-typo-in-value" {
                Decision::ChangeValue(b"a-typo-not-in-value".to_vec())
            } else if key == b"key-0" {
                Decision::RemoveAndSkipUntil(b"key-5".to_vec())
            } else {
                Decision::Keep
            }
        }
    }

    lazy_static! {
        static ref MY_COMPACTION_FILTER: MyCompactionFilter = MyCompactionFilter;
    }

    #[test]
    fn compaction_filter() {
        let tmp_dir = ::tempdir::TempDir::new_in(".", "rocks").unwrap();
        let db = DB::open(
            Options::default()
                .map_db_options(|db| db.create_if_missing(true))
                .map_cf_options(|cf| cf.compaction_filter(&*MY_COMPACTION_FILTER)),
            &tmp_dir,
        )
        .unwrap();

        println!("compact and try remove range");
        assert!(db.put(&WriteOptions::default(), b"key-0", b"23333").is_ok());
        assert!(db.put(&WriteOptions::default(), b"key-1", b"23333").is_ok());
        assert!(db.put(&WriteOptions::default(), b"key-2", b"23333").is_ok());
        assert!(db.put(&WriteOptions::default(), b"key-3", b"23333").is_ok());
        assert!(db.put(&WriteOptions::default(), b"key-4", b"23333").is_ok());
        // following will be reserved
        assert!(db.put(&WriteOptions::default(), b"key-5", b"23333").is_ok());
        assert!(db.put(&WriteOptions::default(), b"key-6", b"23333").is_ok());
        assert!(db.put(&WriteOptions::default(), b"key-7", b"23333").is_ok());
        assert!(db.put(&WriteOptions::default(), b"key-8", b"23333").is_ok());

        println!("compact and delete");
        assert!(db
            .put(&WriteOptions::default(), b"will-delete-me", b"TO-BE-DELETED")
            .is_ok());

        println!("compact and change value");
        assert!(db
            .put(&WriteOptions::default(), b"will-fix-me", b"an-typo-in-value")
            .is_ok());

        // now compact full range
        let ret = db.compact_range(&Default::default(), ..);
        assert!(ret.is_ok(), "error: {:?}", ret);

        assert!(db.get(&ReadOptions::default(), b"will-delete-me").is_err());
        assert!(db
            .get(&ReadOptions::default(), b"will-delete-me")
            .unwrap_err()
            .is_not_found());

        assert!(db.get(&ReadOptions::default(), b"key-0").is_err());
        assert!(db.get(&ReadOptions::default(), b"key-0").unwrap_err().is_not_found());

        assert!(db.get(&ReadOptions::default(), b"key-4").is_err());
        assert!(db.get(&ReadOptions::default(), b"key-4").unwrap_err().is_not_found());

        assert_eq!(db.get(&ReadOptions::default(), b"key-5").unwrap(), b"23333");

        assert_eq!(
            db.get(&ReadOptions::default(), b"will-fix-me").unwrap(),
            b"a-typo-not-in-value"
        );

        drop(db);
        drop(tmp_dir);
    }
}