rocksdb_store/
lib.rs

1use mapper::TableMapper;
2use rocksdb::{
3    ColumnFamily, ColumnFamilyDescriptor, OptimisticTransactionDB, Options, TransactionDB,
4    TransactionDBOptions, DB,
5};
6
7use std::path::Path;
8
9pub mod error;
10pub mod mapper;
11pub mod wrapper;
12
13use error::Error;
14use wrapper::Db;
15
16const CONFIG_CF_NAME: &str = "_config";
17const BOOKS_CF_NAME: &str = "_books";
18
19type ConfigBincodeConfigType = bincode::config::Configuration<bincode::config::BigEndian>;
20type BooksBincodeConfigType = bincode::config::Configuration<bincode::config::BigEndian>;
21
22const CONFIG_BINCODE_CONFIG: ConfigBincodeConfigType =
23    bincode::config::standard().with_big_endian();
24const BOOKS_BINCODE_CONFIG: BooksBincodeConfigType = bincode::config::standard().with_big_endian();
25
26#[derive(Clone)]
27pub struct Database<const W: bool, C, B> {
28    pub db: Db,
29    pub config: C,
30    pub books: B,
31}
32
33impl<const W: bool, C, B> Database<W, C, B> {
34    fn config_cf(db: &Db) -> &ColumnFamily {
35        db.handle(CONFIG_CF_NAME)
36            .expect("Config table column family does not exist")
37    }
38
39    fn books_cf(db: &Db) -> &ColumnFamily {
40        db.handle(BOOKS_CF_NAME)
41            .expect("Books table column family does not exist")
42    }
43
44    fn config_mapper(db: &Db) -> TableMapper<'_, W, ConfigBincodeConfigType> {
45        mapper::TableMapper::new(db, Self::config_cf(db), CONFIG_BINCODE_CONFIG)
46    }
47
48    fn books_mapper(db: &Db) -> TableMapper<'_, W, BooksBincodeConfigType> {
49        mapper::TableMapper::new(db, Self::books_cf(db), BOOKS_BINCODE_CONFIG)
50    }
51}
52
53impl<C: serde::ser::Serialize, B: serde::ser::Serialize> Database<true, C, B> {
54    pub fn create<P: AsRef<Path>>(
55        path: P,
56        mut cfs: Vec<ColumnFamilyDescriptor>,
57        mut options: Options,
58        optimistic_transactions: bool,
59        config: C,
60        books: B,
61    ) -> Result<Self, Error> {
62        let config_cf = ColumnFamilyDescriptor::new(CONFIG_CF_NAME, Options::default());
63        let books_cf = ColumnFamilyDescriptor::new(BOOKS_CF_NAME, Options::default());
64
65        cfs.push(config_cf);
66        cfs.push(books_cf);
67
68        options.create_missing_column_families(true);
69        options.create_if_missing(true);
70
71        let db: Db = if optimistic_transactions {
72            OptimisticTransactionDB::open_cf_descriptors(&options, path, cfs)?.into()
73        } else {
74            let transaction_options = TransactionDBOptions::default();
75
76            TransactionDB::open_cf_descriptors(&options, &transaction_options, path, cfs)?.into()
77        };
78
79        Self::write_config_with_db(&db, &config)?;
80        Self::write_books_with_db(&db, &books)?;
81
82        Ok(Self { db, config, books })
83    }
84
85    pub fn write_config(&self, config: &C) -> Result<(), mapper::Error> {
86        Self::write_config_with_db(&self.db, config)
87    }
88
89    pub fn write_books(&self, books: &B) -> Result<(), mapper::Error> {
90        Self::write_books_with_db(&self.db, books)
91    }
92
93    fn write_config_with_db(db: &Db, config: &C) -> Result<(), mapper::Error> {
94        config.serialize(Self::config_mapper(db))
95    }
96
97    fn write_books_with_db(db: &Db, books: &B) -> Result<(), mapper::Error> {
98        books.serialize(Self::books_mapper(db))
99    }
100}
101
102impl<'de, C: serde::de::Deserialize<'de>, B: serde::de::Deserialize<'de>> Database<true, C, B> {
103    pub fn open_with_pessimistic_transactions<P: AsRef<Path>>(
104        path: P,
105        cfs: Vec<ColumnFamilyDescriptor>,
106        options: Options,
107    ) -> Result<Self, Error> {
108        Database::open_internal(path, cfs, options, true)
109    }
110}
111
112impl<'de, const W: bool, C: serde::de::Deserialize<'de>, B: serde::de::Deserialize<'de>>
113    Database<W, C, B>
114{
115    pub fn open<P: AsRef<Path>>(
116        path: P,
117        cfs: Vec<ColumnFamilyDescriptor>,
118        options: Options,
119    ) -> Result<Self, Error> {
120        Self::open_internal(path, cfs, options, true)
121    }
122
123    pub fn read_config(&self) -> Result<C, mapper::Error> {
124        Self::read_config_with_db(&self.db)
125    }
126
127    pub fn read_books(&self) -> Result<B, mapper::Error> {
128        Self::read_books_with_db(&self.db)
129    }
130
131    fn open_internal<P: AsRef<Path>>(
132        path: P,
133        mut cfs: Vec<ColumnFamilyDescriptor>,
134        options: Options,
135        optimistic_transactions: bool,
136    ) -> Result<Self, Error> {
137        let config_cf = ColumnFamilyDescriptor::new(CONFIG_CF_NAME, Options::default());
138        let books_cf = ColumnFamilyDescriptor::new(BOOKS_CF_NAME, Options::default());
139
140        cfs.push(config_cf);
141        cfs.push(books_cf);
142
143        let db: Db = if !W {
144            DB::open_cf_descriptors_read_only(&options, path, cfs, false)?.into()
145        } else if optimistic_transactions {
146            OptimisticTransactionDB::open_cf_descriptors(&options, path, cfs)?.into()
147        } else {
148            let transaction_options = TransactionDBOptions::default();
149
150            TransactionDB::open_cf_descriptors(&options, &transaction_options, path, cfs)?.into()
151        };
152
153        let config = Self::read_config_with_db(&db)?;
154        let books = Self::read_books_with_db(&db)?;
155
156        Ok(Self { db, config, books })
157    }
158
159    fn read_config_with_db(db: &Db) -> Result<C, mapper::Error> {
160        C::deserialize(&Self::config_mapper(db))
161    }
162
163    fn read_books_with_db(db: &Db) -> Result<B, mapper::Error> {
164        B::deserialize(&Self::books_mapper(db))
165    }
166}
167
168impl<C, B> Database<true, C, B> {
169    pub fn admin<P: AsRef<Path>>(
170        path: P,
171        mut cfs: Vec<ColumnFamilyDescriptor>,
172    ) -> Result<Admin, Error> {
173        let config_cf = ColumnFamilyDescriptor::new(CONFIG_CF_NAME, Options::default());
174        let books_cf = ColumnFamilyDescriptor::new(BOOKS_CF_NAME, Options::default());
175
176        cfs.push(config_cf);
177        cfs.push(books_cf);
178
179        let cf_names = cfs.iter().map(|cf| cf.name().to_string()).collect();
180
181        Ok(Admin {
182            underlying: DB::open_cf_descriptors(&Options::default(), path, cfs)?,
183            cf_names,
184        })
185    }
186}
187
188impl<C, B> Database<false, C, B> {
189    pub fn underlying(&self) -> &DB {
190        // Safe because we know statically that the database is read-only.
191        self.db.read_only().unwrap()
192    }
193}
194
195pub struct Admin {
196    underlying: DB,
197    cf_names: Vec<String>,
198}
199
200impl Admin {
201    pub fn flush(&self) -> Result<(), rocksdb::Error> {
202        for cf_name in &self.cf_names {
203            if let Some(cf) = self.underlying.cf_handle(cf_name) {
204                self.underlying.flush_cf(cf)?;
205            }
206        }
207
208        Ok(())
209    }
210
211    pub fn compact(&self) -> Result<(), rocksdb::Error> {
212        let mut options = rocksdb::CompactOptions::default();
213        options.set_change_level(true);
214
215        for cf_name in &self.cf_names {
216            if let Some(cf) = self.underlying.cf_handle(cf_name) {
217                self.underlying
218                    .compact_range_cf_opt::<&[u8], &[u8]>(cf, None, None, &options);
219            }
220        }
221
222        self.underlying.wait_for_compact(&Default::default())
223    }
224}
225
226#[cfg(test)]
227mod tests {
228    use quickcheck_arbitrary_derive::QuickCheck;
229
230    #[derive(
231        Clone,
232        Copy,
233        Debug,
234        Default,
235        Eq,
236        PartialEq,
237        QuickCheck,
238        serde_derive::Deserialize,
239        serde_derive::Serialize,
240    )]
241    pub enum Hashes {
242        #[default]
243        Both,
244        Md5Only,
245        Sha256Only,
246    }
247
248    #[derive(
249        Clone, Debug, Eq, PartialEq, QuickCheck, serde_derive::Deserialize, serde_derive::Serialize,
250    )]
251    struct Config {
252        hashes: Hashes,
253        case_sensitive: bool,
254    }
255
256    #[derive(
257        Clone, Debug, Eq, PartialEq, QuickCheck, serde_derive::Deserialize, serde_derive::Serialize,
258    )]
259    struct Books {
260        last_scrape_ms: u64,
261        region: String,
262    }
263
264    #[quickcheck_macros::quickcheck]
265    fn round_trip_instantiate(config: Config, books: Books) -> bool {
266        let test_db_dir = tempfile::tempdir().unwrap();
267
268        let writeable_db = super::Database::create(
269            &test_db_dir,
270            vec![],
271            Default::default(),
272            true,
273            config.clone(),
274            books.clone(),
275        )
276        .unwrap();
277
278        writeable_db.db.close();
279
280        let read_only_db =
281            super::Database::<true, Config, Books>::open(test_db_dir, vec![], Default::default())
282                .unwrap();
283
284        read_only_db.config == config && read_only_db.books == books
285    }
286
287    #[quickcheck_macros::quickcheck]
288    fn round_trip_instantiate_with_unit(config: (), books: ()) -> bool {
289        let test_db_dir = tempfile::tempdir().unwrap();
290
291        let writeable_db =
292            super::Database::create(&test_db_dir, vec![], Default::default(), true, (), ())
293                .unwrap();
294
295        writeable_db.db.close();
296
297        let read_only_db =
298            super::Database::<true, (), ()>::open(test_db_dir, vec![], Default::default()).unwrap();
299
300        read_only_db.config == config && read_only_db.books == books
301    }
302
303    #[quickcheck_macros::quickcheck]
304    fn round_trip_instantiate_with_pessimistic_transactions(config: Config, books: Books) -> bool {
305        let test_db_dir = tempfile::tempdir().unwrap();
306
307        let writeable_db = super::Database::create(
308            &test_db_dir,
309            vec![],
310            Default::default(),
311            false,
312            config.clone(),
313            books.clone(),
314        )
315        .unwrap();
316
317        writeable_db.db.close();
318
319        let read_only_db =
320            super::Database::<true, Config, Books>::open(test_db_dir, vec![], Default::default())
321                .unwrap();
322
323        read_only_db.config == config && read_only_db.books == books
324    }
325
326    #[quickcheck_macros::quickcheck]
327    fn round_trip_write(
328        config: Config,
329        books: Books,
330        new_config: Config,
331        new_books: Books,
332    ) -> bool {
333        let test_db_dir = tempfile::tempdir().unwrap();
334
335        let writeable_db = super::Database::create(
336            &test_db_dir,
337            vec![],
338            Default::default(),
339            true,
340            config.clone(),
341            books.clone(),
342        )
343        .unwrap();
344
345        writeable_db.db.close();
346
347        let writeable_db =
348            super::Database::<true, Config, Books>::open(&test_db_dir, vec![], Default::default())
349                .unwrap();
350
351        assert!(writeable_db.config == config && writeable_db.books == books);
352
353        writeable_db.write_config(&new_config).unwrap();
354        writeable_db.write_books(&new_books).unwrap();
355
356        writeable_db.read_config().unwrap() == new_config
357            && writeable_db.read_books().unwrap() == new_books
358    }
359
360    #[quickcheck_macros::quickcheck]
361    fn round_trip_write_with_pessimistic_transactions(
362        config: Config,
363        books: Books,
364        new_config: Config,
365        new_books: Books,
366    ) -> bool {
367        let test_db_dir = tempfile::tempdir().unwrap();
368
369        let writeable_db = super::Database::create(
370            &test_db_dir,
371            vec![],
372            Default::default(),
373            false,
374            config.clone(),
375            books.clone(),
376        )
377        .unwrap();
378
379        writeable_db.db.close();
380
381        let writeable_db =
382            super::Database::<true, Config, Books>::open_with_pessimistic_transactions(
383                &test_db_dir,
384                vec![],
385                Default::default(),
386            )
387            .unwrap();
388
389        assert!(writeable_db.config == config && writeable_db.books == books);
390
391        writeable_db.write_config(&new_config).unwrap();
392        writeable_db.write_books(&new_books).unwrap();
393
394        writeable_db.read_config().unwrap() == new_config
395            && writeable_db.read_books().unwrap() == new_books
396    }
397}