1use mapper::TableMapper;
2use rocksdb::{
3 ColumnFamily, ColumnFamilyDescriptor, OptimisticTransactionDB, Options, TransactionDB,
4 TransactionDBOptions, DB,
5};
6
7use std::path::Path;
8
9pub mod error;
10pub mod mapper;
11pub mod wrapper;
12
13use error::Error;
14use wrapper::Db;
15
16const CONFIG_CF_NAME: &str = "_config";
17const BOOKS_CF_NAME: &str = "_books";
18
19type ConfigBincodeConfigType = bincode::config::Configuration<bincode::config::BigEndian>;
20type BooksBincodeConfigType = bincode::config::Configuration<bincode::config::BigEndian>;
21
22const CONFIG_BINCODE_CONFIG: ConfigBincodeConfigType =
23 bincode::config::standard().with_big_endian();
24const BOOKS_BINCODE_CONFIG: BooksBincodeConfigType = bincode::config::standard().with_big_endian();
25
26#[derive(Clone)]
27pub struct Database<const W: bool, C, B> {
28 pub db: Db,
29 pub config: C,
30 pub books: B,
31}
32
33impl<const W: bool, C, B> Database<W, C, B> {
34 fn config_cf(db: &Db) -> &ColumnFamily {
35 db.handle(CONFIG_CF_NAME)
36 .expect("Config table column family does not exist")
37 }
38
39 fn books_cf(db: &Db) -> &ColumnFamily {
40 db.handle(BOOKS_CF_NAME)
41 .expect("Books table column family does not exist")
42 }
43
44 fn config_mapper(db: &Db) -> TableMapper<'_, W, ConfigBincodeConfigType> {
45 mapper::TableMapper::new(db, Self::config_cf(db), CONFIG_BINCODE_CONFIG)
46 }
47
48 fn books_mapper(db: &Db) -> TableMapper<'_, W, BooksBincodeConfigType> {
49 mapper::TableMapper::new(db, Self::books_cf(db), BOOKS_BINCODE_CONFIG)
50 }
51}
52
53impl<C: serde::ser::Serialize, B: serde::ser::Serialize> Database<true, C, B> {
54 pub fn create<P: AsRef<Path>>(
55 path: P,
56 mut cfs: Vec<ColumnFamilyDescriptor>,
57 mut options: Options,
58 optimistic_transactions: bool,
59 config: C,
60 books: B,
61 ) -> Result<Self, Error> {
62 let config_cf = ColumnFamilyDescriptor::new(CONFIG_CF_NAME, Options::default());
63 let books_cf = ColumnFamilyDescriptor::new(BOOKS_CF_NAME, Options::default());
64
65 cfs.push(config_cf);
66 cfs.push(books_cf);
67
68 options.create_missing_column_families(true);
69 options.create_if_missing(true);
70
71 let db: Db = if optimistic_transactions {
72 OptimisticTransactionDB::open_cf_descriptors(&options, path, cfs)?.into()
73 } else {
74 let transaction_options = TransactionDBOptions::default();
75
76 TransactionDB::open_cf_descriptors(&options, &transaction_options, path, cfs)?.into()
77 };
78
79 Self::write_config_with_db(&db, &config)?;
80 Self::write_books_with_db(&db, &books)?;
81
82 Ok(Self { db, config, books })
83 }
84
85 pub fn write_config(&self, config: &C) -> Result<(), mapper::Error> {
86 Self::write_config_with_db(&self.db, config)
87 }
88
89 pub fn write_books(&self, books: &B) -> Result<(), mapper::Error> {
90 Self::write_books_with_db(&self.db, books)
91 }
92
93 fn write_config_with_db(db: &Db, config: &C) -> Result<(), mapper::Error> {
94 config.serialize(Self::config_mapper(db))
95 }
96
97 fn write_books_with_db(db: &Db, books: &B) -> Result<(), mapper::Error> {
98 books.serialize(Self::books_mapper(db))
99 }
100}
101
102impl<'de, C: serde::de::Deserialize<'de>, B: serde::de::Deserialize<'de>> Database<true, C, B> {
103 pub fn open_with_pessimistic_transactions<P: AsRef<Path>>(
104 path: P,
105 cfs: Vec<ColumnFamilyDescriptor>,
106 options: Options,
107 ) -> Result<Self, Error> {
108 Database::open_internal(path, cfs, options, true)
109 }
110}
111
112impl<'de, const W: bool, C: serde::de::Deserialize<'de>, B: serde::de::Deserialize<'de>>
113 Database<W, C, B>
114{
115 pub fn open<P: AsRef<Path>>(
116 path: P,
117 cfs: Vec<ColumnFamilyDescriptor>,
118 options: Options,
119 ) -> Result<Self, Error> {
120 Self::open_internal(path, cfs, options, true)
121 }
122
123 pub fn read_config(&self) -> Result<C, mapper::Error> {
124 Self::read_config_with_db(&self.db)
125 }
126
127 pub fn read_books(&self) -> Result<B, mapper::Error> {
128 Self::read_books_with_db(&self.db)
129 }
130
131 fn open_internal<P: AsRef<Path>>(
132 path: P,
133 mut cfs: Vec<ColumnFamilyDescriptor>,
134 options: Options,
135 optimistic_transactions: bool,
136 ) -> Result<Self, Error> {
137 let config_cf = ColumnFamilyDescriptor::new(CONFIG_CF_NAME, Options::default());
138 let books_cf = ColumnFamilyDescriptor::new(BOOKS_CF_NAME, Options::default());
139
140 cfs.push(config_cf);
141 cfs.push(books_cf);
142
143 let db: Db = if !W {
144 DB::open_cf_descriptors_read_only(&options, path, cfs, false)?.into()
145 } else if optimistic_transactions {
146 OptimisticTransactionDB::open_cf_descriptors(&options, path, cfs)?.into()
147 } else {
148 let transaction_options = TransactionDBOptions::default();
149
150 TransactionDB::open_cf_descriptors(&options, &transaction_options, path, cfs)?.into()
151 };
152
153 let config = Self::read_config_with_db(&db)?;
154 let books = Self::read_books_with_db(&db)?;
155
156 Ok(Self { db, config, books })
157 }
158
159 fn read_config_with_db(db: &Db) -> Result<C, mapper::Error> {
160 C::deserialize(&Self::config_mapper(db))
161 }
162
163 fn read_books_with_db(db: &Db) -> Result<B, mapper::Error> {
164 B::deserialize(&Self::books_mapper(db))
165 }
166}
167
168impl<C, B> Database<true, C, B> {
169 pub fn admin<P: AsRef<Path>>(
170 path: P,
171 mut cfs: Vec<ColumnFamilyDescriptor>,
172 ) -> Result<Admin, Error> {
173 let config_cf = ColumnFamilyDescriptor::new(CONFIG_CF_NAME, Options::default());
174 let books_cf = ColumnFamilyDescriptor::new(BOOKS_CF_NAME, Options::default());
175
176 cfs.push(config_cf);
177 cfs.push(books_cf);
178
179 let cf_names = cfs.iter().map(|cf| cf.name().to_string()).collect();
180
181 Ok(Admin {
182 underlying: DB::open_cf_descriptors(&Options::default(), path, cfs)?,
183 cf_names,
184 })
185 }
186}
187
188impl<C, B> Database<false, C, B> {
189 pub fn underlying(&self) -> &DB {
190 self.db.read_only().unwrap()
192 }
193}
194
195pub struct Admin {
196 underlying: DB,
197 cf_names: Vec<String>,
198}
199
200impl Admin {
201 pub fn flush(&self) -> Result<(), rocksdb::Error> {
202 for cf_name in &self.cf_names {
203 if let Some(cf) = self.underlying.cf_handle(cf_name) {
204 self.underlying.flush_cf(cf)?;
205 }
206 }
207
208 Ok(())
209 }
210
211 pub fn compact(&self) -> Result<(), rocksdb::Error> {
212 let mut options = rocksdb::CompactOptions::default();
213 options.set_change_level(true);
214
215 for cf_name in &self.cf_names {
216 if let Some(cf) = self.underlying.cf_handle(cf_name) {
217 self.underlying
218 .compact_range_cf_opt::<&[u8], &[u8]>(cf, None, None, &options);
219 }
220 }
221
222 self.underlying.wait_for_compact(&Default::default())
223 }
224}
225
226#[cfg(test)]
227mod tests {
228 use quickcheck_arbitrary_derive::QuickCheck;
229
230 #[derive(
231 Clone,
232 Copy,
233 Debug,
234 Default,
235 Eq,
236 PartialEq,
237 QuickCheck,
238 serde_derive::Deserialize,
239 serde_derive::Serialize,
240 )]
241 pub enum Hashes {
242 #[default]
243 Both,
244 Md5Only,
245 Sha256Only,
246 }
247
248 #[derive(
249 Clone, Debug, Eq, PartialEq, QuickCheck, serde_derive::Deserialize, serde_derive::Serialize,
250 )]
251 struct Config {
252 hashes: Hashes,
253 case_sensitive: bool,
254 }
255
256 #[derive(
257 Clone, Debug, Eq, PartialEq, QuickCheck, serde_derive::Deserialize, serde_derive::Serialize,
258 )]
259 struct Books {
260 last_scrape_ms: u64,
261 region: String,
262 }
263
264 #[quickcheck_macros::quickcheck]
265 fn round_trip_instantiate(config: Config, books: Books) -> bool {
266 let test_db_dir = tempfile::tempdir().unwrap();
267
268 let writeable_db = super::Database::create(
269 &test_db_dir,
270 vec![],
271 Default::default(),
272 true,
273 config.clone(),
274 books.clone(),
275 )
276 .unwrap();
277
278 writeable_db.db.close();
279
280 let read_only_db =
281 super::Database::<true, Config, Books>::open(test_db_dir, vec![], Default::default())
282 .unwrap();
283
284 read_only_db.config == config && read_only_db.books == books
285 }
286
287 #[quickcheck_macros::quickcheck]
288 fn round_trip_instantiate_with_unit(config: (), books: ()) -> bool {
289 let test_db_dir = tempfile::tempdir().unwrap();
290
291 let writeable_db =
292 super::Database::create(&test_db_dir, vec![], Default::default(), true, (), ())
293 .unwrap();
294
295 writeable_db.db.close();
296
297 let read_only_db =
298 super::Database::<true, (), ()>::open(test_db_dir, vec![], Default::default()).unwrap();
299
300 read_only_db.config == config && read_only_db.books == books
301 }
302
303 #[quickcheck_macros::quickcheck]
304 fn round_trip_instantiate_with_pessimistic_transactions(config: Config, books: Books) -> bool {
305 let test_db_dir = tempfile::tempdir().unwrap();
306
307 let writeable_db = super::Database::create(
308 &test_db_dir,
309 vec![],
310 Default::default(),
311 false,
312 config.clone(),
313 books.clone(),
314 )
315 .unwrap();
316
317 writeable_db.db.close();
318
319 let read_only_db =
320 super::Database::<true, Config, Books>::open(test_db_dir, vec![], Default::default())
321 .unwrap();
322
323 read_only_db.config == config && read_only_db.books == books
324 }
325
326 #[quickcheck_macros::quickcheck]
327 fn round_trip_write(
328 config: Config,
329 books: Books,
330 new_config: Config,
331 new_books: Books,
332 ) -> bool {
333 let test_db_dir = tempfile::tempdir().unwrap();
334
335 let writeable_db = super::Database::create(
336 &test_db_dir,
337 vec![],
338 Default::default(),
339 true,
340 config.clone(),
341 books.clone(),
342 )
343 .unwrap();
344
345 writeable_db.db.close();
346
347 let writeable_db =
348 super::Database::<true, Config, Books>::open(&test_db_dir, vec![], Default::default())
349 .unwrap();
350
351 assert!(writeable_db.config == config && writeable_db.books == books);
352
353 writeable_db.write_config(&new_config).unwrap();
354 writeable_db.write_books(&new_books).unwrap();
355
356 writeable_db.read_config().unwrap() == new_config
357 && writeable_db.read_books().unwrap() == new_books
358 }
359
360 #[quickcheck_macros::quickcheck]
361 fn round_trip_write_with_pessimistic_transactions(
362 config: Config,
363 books: Books,
364 new_config: Config,
365 new_books: Books,
366 ) -> bool {
367 let test_db_dir = tempfile::tempdir().unwrap();
368
369 let writeable_db = super::Database::create(
370 &test_db_dir,
371 vec![],
372 Default::default(),
373 false,
374 config.clone(),
375 books.clone(),
376 )
377 .unwrap();
378
379 writeable_db.db.close();
380
381 let writeable_db =
382 super::Database::<true, Config, Books>::open_with_pessimistic_transactions(
383 &test_db_dir,
384 vec![],
385 Default::default(),
386 )
387 .unwrap();
388
389 assert!(writeable_db.config == config && writeable_db.books == books);
390
391 writeable_db.write_config(&new_config).unwrap();
392 writeable_db.write_books(&new_books).unwrap();
393
394 writeable_db.read_config().unwrap() == new_config
395 && writeable_db.read_books().unwrap() == new_books
396 }
397}