1use error::{
23 GetError, MergeError, MergeErrorKind, OpenError, OpenErrorKind, RemoveError,
24 SetError,
25};
26use keydir::KeyDir;
27use logfile::LogFileEntry;
28use readers::Readers;
29
30use log::{info, trace};
31use writer::Writer;
32
33use std::sync::{Arc, Mutex, RwLock};
34use std::{
35 io::{Seek, SeekFrom},
36 path::{Path, PathBuf},
37};
38
39use crate::error::GetErrorKind;
40
41pub mod error;
43
44mod bufio;
45mod keydir;
46mod logfile;
47mod readers;
48mod utils;
49mod writer;
50
51type GenerationNumber = u64;
52
53const MAX_DATA_FILE_SIZE: u64 = 2 * 1024 * 1024 * 1024; #[derive(Clone, Debug)]
57pub struct Rustcask {
58 writer: Arc<Mutex<Writer>>,
60
61 readers: Readers,
63
64 pub(crate) keydir: Arc<RwLock<KeyDir>>,
65
66 sync_mode: bool,
67
68 pub(crate) directory: Arc<PathBuf>,
69}
70
71impl Rustcask {
72 pub fn builder() -> RustcaskBuilder {
74 RustcaskBuilder::default()
75 }
76
77 pub fn set(&mut self, key: Vec<u8>, value: Vec<u8>) -> Result<(), SetError> {
100 trace!(
101 "Set called with key (as UTF 8) {}",
102 String::from_utf8_lossy(&key)
103 );
104
105 let mut writer = self
106 .writer
107 .lock()
108 .expect("Another thread crashed while holding the writer lock. Panicking.");
109
110 writer.set(key, value)
111 }
112
113 pub fn get<'a>(&'a mut self, key: &'a Vec<u8>) -> Result<Option<Vec<u8>>, GetError<'a>> {
132 trace!(
133 "Get called with key (as UTF 8) {}",
134 String::from_utf8_lossy(key)
135 );
136 let keydir = self
137 .keydir
138 .read()
139 .expect("Another thread panicked while holding the keydir lock. Panicking.");
140 let keydir_entry = keydir.get(key);
141 if keydir_entry.is_none() {
142 return Ok(None);
143 }
144 let keydir_entry = keydir_entry.unwrap();
145
146 let reader = self
147 .readers
148 .get_data_file_reader(keydir_entry.data_file_gen);
149
150 let log_index = &keydir_entry.index;
152 reader
153 .seek(SeekFrom::Start(log_index.offset))
154 .map_err(|err| GetError {
155 kind: GetErrorKind::Io(err),
156 key,
157 })?;
158
159 let data_file_entry: LogFileEntry =
160 bincode::deserialize_from(reader).map_err(|err| GetError {
161 kind: GetErrorKind::Deserialize(err),
162 key,
163 })?;
164
165 assert_eq!(
166 &data_file_entry.key, key,
167 "The deserialized entries key does not match the key passed to get. The data store could corrupted."
168 );
169
170 Ok(Some(data_file_entry.value.expect(
171 "We returned a tombstone value from get. We should have instead returned None.
172 The data store may not be corrupted - this indicates a programming bug.",
173 )))
174 }
175
176 pub fn remove(&mut self, key: Vec<u8>) -> Result<Option<Vec<u8>>, RemoveError> {
204 trace!(
205 "Remove called with key (as UTF 8) {}",
206 String::from_utf8_lossy(&key)
207 );
208 let mut writer = self
209 .writer
210 .lock()
211 .expect("Another thread crashed while holding the writer lock. Panicking.");
212
213 writer.remove(key)
214 }
215
216 pub fn merge(&mut self) -> Result<(), MergeError> {
231 let mut writer = self
237 .writer
238 .lock()
239 .expect("Another thread crashed while holding the writer lock. Panicking.");
240
241 if !writer.can_merge() {
242 return Err(MergeError {
243 kind: MergeErrorKind::OutsideMergeWindow,
244 merge_generation: writer.get_active_generation() + 1,
245 });
246 }
247
248 writer.merge()?;
249
250 info!("Merged data files.");
252
253 Ok(())
254 }
255
256 fn get_active_generation(&self) -> GenerationNumber {
258 let writer = self.writer.lock().expect(
259 "Another thread crashed while holding the writer lock. \
260 Panicking because the write lock is required to get the active generation.",
261 );
262 writer.get_active_generation()
263 }
264
265 fn get_active_data_file_size(&self) -> u64 {
266 let writer = self.writer.lock().expect(
267 "Another thread crashed while holding the writer lock. \
268 Panicking because the write lock is required to get the active data file size.",
269 );
270 writer.get_active_data_file_size()
271 }
272}
273
274pub struct RustcaskBuilder {
287 max_data_file_size: u64,
288
289 sync_mode: bool,
294}
295
296impl Default for RustcaskBuilder {
297 fn default() -> Self {
298 Self {
299 max_data_file_size: MAX_DATA_FILE_SIZE,
300 sync_mode: false,
301 }
302 }
303}
304
305impl RustcaskBuilder {
306 pub fn set_max_data_file_size(mut self, max_size: u64) -> Self {
310 self.max_data_file_size = max_size;
311 self
312 }
313
314 pub fn set_sync_mode(mut self, sync_mode: bool) -> Self {
319 self.sync_mode = sync_mode;
320 self
321 }
322
323 pub fn open(self, rustcask_dir: &Path) -> Result<Rustcask, OpenError> {
325 trace!(
326 "Open called on directory {}",
327 rustcask_dir.to_string_lossy().to_string()
328 );
329 let rustcask_dir = Arc::new(PathBuf::from(rustcask_dir));
330
331 if !rustcask_dir.is_dir() {
332 return Err(OpenError {
333 kind: OpenErrorKind::BadDirectory,
334 rustcask_dir: rustcask_dir.to_string_lossy().to_string(),
335 });
336 }
337
338 let data_file_readers = Readers::new(rustcask_dir.clone()).map_err(|err| OpenError {
339 kind: OpenErrorKind::Io(err),
340 rustcask_dir: rustcask_dir.to_string_lossy().to_string(),
341 })?;
342
343 let keydir = Arc::new(RwLock::new(KeyDir::new(&rustcask_dir)?));
344
345 let writer = Arc::new(Mutex::new(Writer::new(
346 self.sync_mode,
347 self.max_data_file_size,
348 rustcask_dir.clone(),
349 keydir.clone(),
350 data_file_readers.clone(),
351 )?));
352
353 info!(
354 "Opened Rustcask directory {}. Max data file size: {}. Number of existing data files: {}. Active generation: {}. Sync mode: {}.",
355 rustcask_dir.to_string_lossy().to_string(),
356 self.max_data_file_size,
357 data_file_readers.data_file_readers.len(),
358 writer.lock().unwrap().get_active_generation(),
359 self.sync_mode
360 );
361
362 Ok(Rustcask {
363 readers: data_file_readers,
364 directory: rustcask_dir,
365 keydir,
366 sync_mode: self.sync_mode,
367 writer,
368 })
369 }
370}
371
372#[cfg(test)]
373mod tests {
374 use std::
375 fs::File
376 ;
377
378 use super::*;
379 use logfile::LogFileIterator;
380 use tempfile::{tempdir, TempDir};
381 use utils::{list_generations, tests::{file_names, get_keys, get_keys_values}};
382
383 #[test]
384 fn test_open() {
385 let dir = tempdir().unwrap();
386
387 for number in 1..=5 {
388 File::create(dir.path().join(format!("{}.rustcask.data", number))).unwrap();
389 File::create(dir.path().join(format!("{}.rustcask.hint", number))).unwrap();
390 }
391
392 let rustcask = Rustcask::builder().open(dir.path()).unwrap();
393
394 assert_eq!(rustcask.get_active_generation(), 5);
395 }
396
397 #[test]
398 fn test_open_on_empty_dir() {
399 let dir = tempdir().unwrap();
400 let rustcask = Rustcask::builder().open(dir.path()).unwrap();
401 assert_eq!(rustcask.get_active_generation(), 0);
402 }
403
404 #[test]
405 fn test_open_non_existent_dir() {
406 let dir = tempdir().unwrap();
407 let invalid_dir = dir.path().join("invalid-dir");
408 let rustcask = Rustcask::builder().open(&invalid_dir);
409 assert!(matches!(
410 rustcask,
411 Err(OpenError {
412 kind: OpenErrorKind::BadDirectory,
413 ..
414 })
415 ));
416 }
417
418 #[test]
419 fn test_data_file_rotation() {
420 let temp_dir = TempDir::new().expect("unable to create temporary working directory");
421 let temp_dir_path = temp_dir.path();
422 let mut store = Rustcask::builder()
424 .set_max_data_file_size(1)
425 .open(temp_dir_path)
426 .unwrap();
427
428 let keys = ["key1".as_bytes().to_vec(), "key2".as_bytes().to_vec()];
429 let values = ["value1".as_bytes().to_vec(), "value2".as_bytes().to_vec()];
430
431 assert_eq!(store.get_active_generation(), 0);
432 assert_eq!(store.get_active_data_file_size(), 0);
433
434 store.set(keys[0].clone(), values[0].clone()).unwrap();
435
436 assert_eq!(store.get_active_generation(), 1);
437 assert_eq!(store.get_active_data_file_size(), 0);
438 assert_eq!(
439 store.get(&keys[0].clone()).unwrap(),
440 Some(values[0].clone())
441 );
442
443 let data_files = file_names(temp_dir_path);
444 assert!(
445 data_files.contains(&String::from("0.rustcask.data"))
446 && data_files.contains(&String::from("1.rustcask.data"))
447 );
448 }
449
450 #[test]
451 fn test_merge_internal() {
452 let temp_dir = TempDir::new().expect("unable to create temporary working directory");
453 let temp_dir_path = temp_dir.path();
454 let mut store = Rustcask::builder().open(temp_dir_path).unwrap();
455
456 store
457 .set(
458 "leader".as_bytes().to_vec(),
459 "instance-a".as_bytes().to_vec(),
460 )
461 .unwrap();
462 store
463 .set(
464 "leader".as_bytes().to_vec(),
465 "instance-b".as_bytes().to_vec(),
466 )
467 .unwrap();
468
469 let expected_data_files = vec!["0.rustcask.data"];
470 let data_files = file_names(temp_dir_path);
471 assert_eq!(data_files, expected_data_files);
472
473 let log_file_keys = get_keys(temp_dir_path, &data_files[0]);
474 assert_eq!(log_file_keys.len(), 2);
475 assert_eq!(
476 log_file_keys,
477 vec!["leader".as_bytes().to_vec(), "leader".as_bytes().to_vec()]
478 );
479
480 store.merge().unwrap();
481
482 let expected_data_files = vec!["1.rustcask.data"];
483 let data_files = file_names(temp_dir_path);
484 assert_eq!(data_files, expected_data_files);
485
486 let log_file_iter = LogFileIterator::new(temp_dir_path.join("1.rustcask.data")).unwrap();
487
488 let log_file_entries: Vec<(Vec<u8>, Vec<u8>)> = log_file_iter
489 .map(|x| (x.0.key, x.0.value.unwrap()))
490 .collect();
491
492 assert_eq!(log_file_entries.len(), 1);
493 assert_eq!(log_file_entries[0].0, "leader".as_bytes().to_vec());
494 assert_eq!(log_file_entries[0].1, "instance-b".as_bytes().to_vec());
495 }
496
497 #[test]
498 fn test_data_file_rotation_cloned_stores() {
499 let temp_dir = TempDir::new().expect("unable to create temporary working directory");
500 let temp_dir_path = temp_dir.path();
501 let mut store = Rustcask::builder()
503 .set_max_data_file_size(1)
504 .open(temp_dir_path)
505 .unwrap();
506 let mut store_clone = store.clone();
507
508 store
509 .set("key1".as_bytes().to_vec(), "value1".as_bytes().to_vec())
510 .unwrap();
511 store_clone
512 .set("key2".as_bytes().to_vec(), "value2".as_bytes().to_vec())
513 .unwrap();
514
515 let log_file_keys = get_keys_values(temp_dir_path, &String::from("0.rustcask.data"));
516 assert_eq!(log_file_keys.len(), 1);
517 assert_eq!(
518 log_file_keys,
519 vec![("key1".as_bytes().to_vec(), "value1".as_bytes().to_vec())]
520 );
521
522 let log_file_keys = get_keys_values(temp_dir_path, &String::from("1.rustcask.data"));
523 assert_eq!(log_file_keys.len(), 1);
524 assert_eq!(
525 log_file_keys,
526 vec![("key2".as_bytes().to_vec(), "value2".as_bytes().to_vec())]
527 );
528 }
529
530 #[test]
531 fn test_merge_with_rotate() {
532 let temp_dir = TempDir::new().expect("unable to create temporary working directory");
533 let temp_dir_path = temp_dir.path();
534 let mut store = Rustcask::builder()
535 .set_max_data_file_size(1)
536 .open(temp_dir_path)
537 .unwrap();
538
539 store
540 .set(
541 "leader".as_bytes().to_vec(),
542 "instance-a".as_bytes().to_vec(),
543 )
544 .unwrap();
545 store
546 .set(
547 "last-election-ts".as_bytes().to_vec(),
548 "00:00".as_bytes().to_vec(),
549 )
550 .unwrap();
551 store
552 .set(
553 "leader".as_bytes().to_vec(),
554 "instance-b".as_bytes().to_vec(),
555 )
556 .unwrap();
557
558 check_generations(temp_dir_path, vec![0, 1, 2, 3]);
559 store.merge().unwrap();
560 check_generations(temp_dir_path, vec![4, 5, 6]);
561
562 drop(store);
563 let mut store = Rustcask::builder()
564 .set_max_data_file_size(1)
565 .open(temp_dir_path)
566 .unwrap();
567 assert_eq!(
568 store.get(&"leader".as_bytes().to_vec()).unwrap(),
569 Some("instance-b".as_bytes().to_vec())
570 );
571 assert_eq!(
572 store.get(&"last-election-ts".as_bytes().to_vec()).unwrap(),
573 Some("00:00".as_bytes().to_vec())
574 );
575 }
576
577 #[test]
578 fn test_active_gen_update() {
579 let temp_dir = TempDir::new().expect("unable to create temporary working directory");
580 let temp_dir_path = temp_dir.path();
581 let mut store = Rustcask::builder()
582 .set_max_data_file_size(1)
583 .open(temp_dir_path)
584 .unwrap();
585
586 let mut store_b = store.clone();
587
588 store
589 .set(
590 "leader".as_bytes().to_vec(),
591 "instance-a".as_bytes().to_vec(),
592 )
593 .unwrap();
594 assert_eq!(store.get_active_generation(), 1); assert_eq!(store_b.get_active_generation(), 1);
596
597 store_b
601 .set("key".as_bytes().to_vec(), "value".as_bytes().to_vec())
602 .unwrap();
603 assert_eq!(
604 store.get(&"key".as_bytes().to_vec()).unwrap(),
605 Some("value".as_bytes().to_vec()),
606 )
607 }
608
609 fn check_generations(temp_dir_path: &Path, expected_generations: Vec<GenerationNumber>) {
610 let mut generations: Vec<GenerationNumber> = list_generations(temp_dir_path).unwrap();
611 generations.sort_unstable();
612 assert_eq!(generations, expected_generations);
613 }
614}