1pub use self::address::Address;
17pub use self::builder::StorageBuilder;
18pub use self::header::StorageHeader;
19pub use self::journal::{JournalEntry, JournalRecord, JournalSnapshot};
20
21pub(crate) use self::data_region::DataRegionLumpData; use self::data_region::DataRegion;
24use self::index::LumpIndex;
25use self::journal::JournalRegion;
26use self::portion::Portion;
27use block::BlockSize;
28use lump::{LumpData, LumpDataInner, LumpHeader, LumpId};
29use metrics::StorageMetrics;
30use nvm::NonVolatileMemory;
31use std::ops::Range;
32use Result;
33
34mod address;
35mod allocator;
36mod builder;
37mod data_region;
38mod header;
39mod index;
40mod journal;
41mod portion;
42
43pub const MAGIC_NUMBER: [u8; 4] = *b"lusf";
47
48pub const MAJOR_VERSION: u16 = 1;
52
53pub const MINOR_VERSION: u16 = 1;
57
58pub const MAX_JOURNAL_REGION_SIZE: u64 = Address::MAX;
62
63pub const MAX_DATA_REGION_SIZE: u64 = Address::MAX * BlockSize::MIN as u64;
67
68#[derive(Debug)]
77pub struct Storage<N>
78where
79 N: NonVolatileMemory,
80{
81 header: StorageHeader,
82 journal_region: JournalRegion<N>,
83 data_region: DataRegion<N>,
84 lump_index: LumpIndex,
85 metrics: StorageMetrics,
86}
87impl<N> Storage<N>
88where
89 N: NonVolatileMemory,
90{
91 pub(crate) fn new(
92 header: StorageHeader,
93 journal_region: JournalRegion<N>,
94 data_region: DataRegion<N>,
95 lump_index: LumpIndex,
96 metrics: StorageMetrics,
97 ) -> Self {
98 Storage {
99 header,
100 journal_region,
101 data_region,
102 lump_index,
103 metrics,
104 }
105 }
106
107 pub fn create(nvm: N) -> Result<Self> {
109 track!(StorageBuilder::new().create(nvm))
110 }
111
112 pub fn open(nvm: N) -> Result<Self> {
114 track!(StorageBuilder::new().open(nvm))
115 }
116
117 pub fn header(&self) -> &StorageHeader {
119 &self.header
120 }
121
122 pub fn metrics(&self) -> &StorageMetrics {
124 &self.metrics
125 }
126
127 pub fn usage_range(&self, range: Range<LumpId>) -> StorageUsage {
129 self.lump_index.usage_range(range, self.header.block_size)
130 }
131
132 pub fn get(&mut self, lump_id: &LumpId) -> Result<Option<LumpData>> {
141 match self.lump_index.get(lump_id) {
142 None => Ok(None),
143 Some(portion) => {
144 let data = match portion {
145 Portion::Journal(portion) => {
146 self.metrics.get_journal_lumps.increment();
147 let bytes = track!(self.journal_region.get_embedded_data(portion))?;
148 track!(LumpData::new_embedded(bytes))?
149 }
150 Portion::Data(portion) => {
151 self.metrics.get_data_lumps.increment();
152 track!(self.data_region.get(portion).map(LumpData::from))?
153 }
154 };
155 Ok(Some(data))
156 }
157 }
158 }
159
160 pub fn head(&self, lump_id: &LumpId) -> Option<LumpHeader> {
162 self.lump_index.get(lump_id).map(|portion| LumpHeader {
163 approximate_data_size: portion.len(self.header.block_size),
164 })
165 }
166
167 pub fn list(&self) -> Vec<LumpId> {
176 self.lump_index.list()
177 }
178
179 pub fn list_range(&mut self, range: Range<LumpId>) -> Vec<LumpId> {
181 self.lump_index.list_range(range)
182 }
183
184 pub fn put(&mut self, lump_id: &LumpId, data: &LumpData) -> Result<bool> {
202 let updated = track!(self.delete_if_exists(lump_id, false))?;
203 match data.as_inner() {
204 LumpDataInner::JournalRegion(data) => {
205 track!(self
206 .journal_region
207 .records_embed(&mut self.lump_index, lump_id, data))?;
208 }
209 LumpDataInner::DataRegion(data) => {
210 track!(self.put_lump_to_data_region(lump_id, data))?;
211 }
212 LumpDataInner::DataRegionUnaligned(data) => {
213 let mut aligned_data = DataRegionLumpData::new(data.len(), self.header.block_size);
214 aligned_data.as_bytes_mut().copy_from_slice(data);
215 track!(self.put_lump_to_data_region(lump_id, &aligned_data))?;
216 }
217 }
218 self.metrics.put_lumps_at_running.increment();
219 Ok(!updated)
220 }
221
222 pub fn delete(&mut self, lump_id: &LumpId) -> Result<bool> {
232 track!(self.delete_if_exists(lump_id, true))
233 }
234
235 pub fn delete_range(&mut self, range: Range<LumpId>) -> Result<Vec<LumpId>> {
251 let targets = self.lump_index.list_range(range.clone());
252
253 track!(self
256 .journal_region
257 .records_delete_range(&mut self.lump_index, range))?;
258
259 for lump_id in &targets {
260 if let Some(portion) = self.lump_index.remove(lump_id) {
261 self.metrics.delete_lumps.increment();
262
263 if let Portion::Data(portion) = portion {
264 self.data_region.delete(portion);
268 }
269 }
270 }
271
272 Ok(targets)
273 }
274
275 pub fn allocate_lump_data(&self, size: usize) -> Result<LumpData> {
290 track!(LumpData::aligned_allocate(size, self.header.block_size))
291 }
292
293 pub fn allocate_lump_data_with_bytes(&self, bytes: &[u8]) -> Result<LumpData> {
303 let mut data = track!(self.allocate_lump_data(bytes.len()))?;
304 data.as_bytes_mut().copy_from_slice(bytes);
305 Ok(data)
306 }
307
308 pub fn run_side_job_once(&mut self) -> Result<()> {
314 track!(self.journal_region.run_side_job_once(&mut self.lump_index))?;
315 Ok(())
316 }
317
318 pub fn journal_sync(&mut self) -> Result<()> {
321 self.journal_region.sync()
322 }
323
324 pub fn journal_gc(&mut self) -> Result<()> {
333 self.journal_region.gc_all_entries(&mut self.lump_index)
334 }
335
336 pub fn journal_snapshot(&mut self) -> Result<JournalSnapshot> {
338 let (unreleased_head, head, tail, entries) = track!(self.journal_region.journal_entries())?;
339 Ok(JournalSnapshot {
340 unreleased_head,
341 head,
342 tail,
343 entries,
344 })
345 }
346
347 #[allow(dead_code)]
354 pub(crate) fn set_automatic_gc_mode(&mut self, enable: bool) {
355 self.journal_region.set_automatic_gc_mode(enable);
356 }
357
358 fn put_lump_to_data_region(
359 &mut self,
360 lump_id: &LumpId,
361 data: &DataRegionLumpData,
362 ) -> Result<()> {
363 let portion = track!(self.data_region.put(data))?;
364 track!(self
365 .journal_region
366 .records_put(&mut self.lump_index, lump_id, portion)
367 .map_err(|e| {
368 self.data_region.delete(portion);
369 e
370 }))?;
371 self.lump_index.insert(*lump_id, Portion::Data(portion));
372 Ok(())
373 }
374
375 fn delete_if_exists(&mut self, lump_id: &LumpId, do_record: bool) -> Result<bool> {
376 if let Some(portion) = self.lump_index.remove(lump_id) {
377 self.metrics.delete_lumps.increment();
378 if do_record {
379 track!(self
380 .journal_region
381 .records_delete(&mut self.lump_index, lump_id,))?;
382 }
383 if let Portion::Data(portion) = portion {
384 self.data_region.delete(portion);
385 }
386 Ok(true)
387 } else {
388 Ok(false)
389 }
390 }
391}
392
393#[derive(Debug, Clone)]
395pub enum StorageUsage {
396 Unknown,
398 Approximate(u64),
400}
401impl StorageUsage {
402 pub fn approximate(usage: u64) -> Self {
404 StorageUsage::Approximate(usage)
405 }
406
407 pub fn unknown() -> Self {
409 StorageUsage::Unknown
410 }
411
412 pub fn bytecount(&self) -> Option<u64> {
414 match *self {
415 StorageUsage::Unknown => None,
416 StorageUsage::Approximate(bytes) => Some(bytes),
417 }
418 }
419}
420impl Default for StorageUsage {
421 fn default() -> Self {
422 StorageUsage::Unknown
423 }
424}
425
426#[cfg(test)]
427mod tests {
428 use std::fs::OpenOptions;
429 use std::mem;
430 use tempdir::TempDir;
431 use trackable::result::TestResult;
432
433 use super::*;
434 use block::BlockSize;
435 use lump::{LumpData, LumpId};
436 use nvm::{FileNvm, SharedMemoryNvm};
437 use ErrorKind;
438
439 #[test]
440 fn it_works() -> TestResult {
441 let dir = track_io!(TempDir::new("cannyls_test"))?;
442
443 let nvm = track!(FileNvm::create(
445 dir.path().join("test.lusf"),
446 BlockSize::min().ceil_align(1024 * 1024)
447 ))?;
448 let mut storage = track!(Storage::create(nvm))?;
449
450 assert!(storage.get(&id("000"))?.is_none());
451 assert!(storage.put(&id("000"), &data("hello"))?);
452 assert!(!storage.put(&id("000"), &data("hello"))?);
453 assert_eq!(storage.get(&id("000"))?, Some(data("hello")));
454 assert_eq!(
455 storage.head(&id("000")).map(|h| h.approximate_data_size),
456 Some(5)
457 );
458 assert!(storage.delete(&id("000"))?);
459 assert!(!storage.delete(&id("000"))?);
460 assert!(storage.get(&id("000"))?.is_none());
461 assert!(storage.head(&id("000")).is_none());
462
463 assert!(storage.put(&id("000"), &data("hello"))?);
464 assert!(storage.put(&id("111"), &data("world"))?);
465 for _ in 0..10 {
466 track!(storage.run_side_job_once())?;
467 assert!(storage.put(&id("222"), &data("quux"))?);
468 assert!(storage.delete(&id("222"))?);
469 }
470 mem::drop(storage);
471
472 let nvm = track!(FileNvm::open(dir.path().join("test.lusf")))?;
474 let storage = track!(Storage::open(nvm))?;
475 assert_eq!(storage.list(), vec![id("000"), id("111")]);
476 Ok(())
477 }
478
479 #[test]
480 fn full() -> TestResult {
481 let dir = track_io!(TempDir::new("cannyls_test"))?;
482
483 let nvm = track!(FileNvm::create(
484 dir.path().join("test.lusf"),
485 BlockSize::min().ceil_align(1024 * 1024)
486 ))?;
487 let mut storage = track!(Storage::create(nvm))?;
488
489 assert_eq!(
490 track!(storage.put(&id("000"), &zeroed_data(512 * 1024)))?,
491 true
492 );
493 assert_eq!(
494 storage.put(&id("000"), &zeroed_data(512 * 1024)).ok(),
495 Some(false)
496 );
497 assert_eq!(
498 storage
499 .put(&id("111"), &zeroed_data(512 * 1024))
500 .err()
501 .map(|e| *e.kind()),
502 Some(ErrorKind::StorageFull)
503 );
504
505 assert_eq!(storage.delete(&id("000")).ok(), Some(true));
506 assert_eq!(
507 storage.put(&id("111"), &zeroed_data(512 * 1024)).ok(),
508 Some(true)
509 );
510 Ok(())
511 }
512
513 #[test]
514 fn max_size_lump() -> TestResult {
515 let dir = track_io!(TempDir::new("cannyls_test"))?;
516
517 let nvm = track!(FileNvm::create(
518 dir.path().join("test.lusf"),
519 BlockSize::min().ceil_align(100 * 1024 * 1024)
520 ))?;
521 let mut storage = track!(Storage::create(nvm))?;
522
523 let data = zeroed_data(LumpData::MAX_SIZE);
524 assert_eq!(track!(storage.put(&id("000"), &data))?, true);
525 assert_eq!(track!(storage.get(&id("000")))?, Some(data));
526 Ok(())
527 }
528
529 fn id(id: &str) -> LumpId {
530 id.parse().unwrap()
531 }
532
533 fn data(data: &str) -> LumpData {
534 LumpData::new_embedded(Vec::from(data)).unwrap()
535 }
536
537 fn zeroed_data(size: usize) -> LumpData {
538 let mut data = LumpData::aligned_allocate(size, BlockSize::min()).unwrap();
539 for v in data.as_bytes_mut() {
540 *v = 0;
541 }
542 data
543 }
544
545 #[test]
546 fn open_older_compatible_version_works() -> TestResult {
547 let dir = track_io!(TempDir::new("cannyls_test"))?;
548 let path = dir.path().join("test.lusf");
549
550 let mut header = {
552 let nvm = track!(FileNvm::create(&path, 1024 * 1024))?;
553 let storage = track!(Storage::create(nvm))?;
554 let header = storage.header().clone();
555 assert_eq!(header.major_version, MAJOR_VERSION);
556 assert_eq!(header.minor_version, MINOR_VERSION);
557 header
558 };
559
560 {
562 header.minor_version = header
563 .minor_version
564 .checked_sub(1)
565 .expect("このテストは`MINOR_VERSION >= 1`であることを前提としている");
566 let file = track_any_err!(OpenOptions::new().write(true).open(&path))?;
567 track!(header.write_to(file))?;
568 }
569
570 {
572 let nvm = track!(FileNvm::open(&path))?;
573 let storage = track!(Storage::open(nvm))?;
574 let header = storage.header().clone();
575 assert_eq!(header.major_version, MAJOR_VERSION);
576 assert_eq!(header.minor_version, MINOR_VERSION);
577 }
578
579 {
581 let file = track_any_err!(OpenOptions::new().read(true).open(&path))?;
582 let header = track!(StorageHeader::read_from(file))?;
583 assert_eq!(header.major_version, MAJOR_VERSION);
584 assert_eq!(header.minor_version, MINOR_VERSION);
585 }
586 Ok(())
587 }
588
589 #[test]
590 fn block_size_check_when_create() -> TestResult {
591 let nvm_block_size = track!(BlockSize::new(1024))?;
593 let storage_block_size = track!(BlockSize::new(1024))?;
594
595 let storage = track!(StorageBuilder::new()
596 .block_size(storage_block_size)
597 .create(memory_nvm(nvm_block_size)))?;
598 assert_eq!(storage.header().block_size, storage_block_size);
599
600 let nvm_block_size = track!(BlockSize::new(512))?;
602 let storage_block_size = track!(BlockSize::new(1024))?;
603
604 let storage = track!(StorageBuilder::new()
605 .block_size(storage_block_size)
606 .create(memory_nvm(nvm_block_size)))?;
607 assert_eq!(storage.header().block_size, storage_block_size);
608
609 let nvm_block_size = track!(BlockSize::new(1024))?;
611 let storage_block_size = track!(BlockSize::new(512))?;
612
613 assert!(StorageBuilder::new()
614 .block_size(storage_block_size)
615 .create(memory_nvm(nvm_block_size))
616 .is_err());
617
618 let nvm_block_size = track!(BlockSize::new(1024))?;
620 let storage_block_size = track!(BlockSize::new(1536))?;
621
622 assert!(StorageBuilder::new()
623 .block_size(storage_block_size)
624 .create(memory_nvm(nvm_block_size))
625 .is_err());
626
627 Ok(())
628 }
629
630 #[test]
631 fn block_size_check_when_open() -> TestResult {
632 let initial_nvm_block_size = track!(BlockSize::new(1536))?;
634 let storage_block_size = track!(BlockSize::new(1536))?;
635 let mut nvm = memory_nvm(initial_nvm_block_size);
636 assert!(StorageBuilder::new()
637 .block_size(storage_block_size)
638 .create(nvm.clone())
639 .is_ok());
640
641 let storage = track!(Storage::open(nvm.clone()))?;
643 assert_eq!(storage.header().block_size, storage_block_size);
644
645 nvm.set_block_size(track!(BlockSize::new(512))?);
647 let storage = track!(Storage::open(nvm.clone()))?;
648 assert_eq!(storage.header().block_size, storage_block_size);
649
650 nvm.set_block_size(track!(BlockSize::new(2048))?);
652 assert!(Storage::open(nvm.clone()).is_err());
653
654 nvm.set_block_size(track!(BlockSize::new(1024))?);
656 assert!(Storage::open(nvm).is_err());
657
658 Ok(())
659 }
660
661 fn memory_nvm(block_size: BlockSize) -> SharedMemoryNvm {
662 SharedMemoryNvm::with_block_size(vec![0; 1024 * 1024], block_size)
663 }
664
665 fn is_put_with(entry: &JournalEntry, id: &LumpId) -> bool {
666 if let JournalRecord::Put(id_, _) = entry.record {
667 id_ == *id
668 } else {
669 false
670 }
671 }
672
673 fn is_delete_with(entry: &JournalEntry, id: &LumpId) -> bool {
674 if let JournalRecord::Delete(id_) = entry.record {
675 id_ == *id
676 } else {
677 false
678 }
679 }
680
681 #[test]
682 fn full_gc_works() -> TestResult {
683 let dir = track_io!(TempDir::new("cannyls_test"))?;
684
685 let nvm = track!(FileNvm::create(
686 dir.path().join("test.lusf"),
687 BlockSize::min().ceil_align(1024 * 1024)
688 ))?;
689 let mut storage = track!(Storage::create(nvm))?;
690
691 storage.set_automatic_gc_mode(false);
693
694 assert!(storage.put(&id("000"), &zeroed_data(42))?);
695 assert!(storage.put(&id("010"), &zeroed_data(42))?);
696
697 let entries = storage.journal_snapshot().unwrap().entries;
698
699 assert_eq!(entries.len(), 2);
700 assert!(is_put_with(entries.get(0).unwrap(), &id("000")));
701 assert!(is_put_with(entries.get(1).unwrap(), &id("010")));
702
703 storage.journal_gc().unwrap();
704
705 let new_entries = storage.journal_snapshot().unwrap().entries;
706
707 for (e1, e2) in entries.iter().zip(new_entries.iter()) {
708 assert_eq!(e1.record, e2.record);
709 }
713
714 assert!(storage.delete(&id("000"))?);
715 assert!(storage.delete(&id("010"))?);
716
717 let entries = storage.journal_snapshot().unwrap().entries;
718
719 assert_eq!(entries.len(), 4);
720
721 assert!(is_put_with(entries.get(0).unwrap(), &id("000")));
722 assert!(is_put_with(entries.get(1).unwrap(), &id("010")));
723 assert!(is_delete_with(entries.get(2).unwrap(), &id("000")));
724 assert!(is_delete_with(entries.get(3).unwrap(), &id("010")));
725
726 storage.journal_gc().unwrap();
727
728 let entries = storage.journal_snapshot().unwrap().entries;
729
730 assert_eq!(entries.len(), 0);
731
732 Ok(())
733 }
734
735 #[test]
736 fn journal_overflow_example() -> TestResult {
737 let dir = track_io!(TempDir::new("cannyls_test"))?;
738
739 let nvm = track!(FileNvm::create(
740 dir.path().join("test.lusf"),
741 BlockSize::min().ceil_align(1024 * 400)
742 ))?;
743 let mut storage = track!(StorageBuilder::new().journal_region_ratio(0.01).create(nvm))?;
744 storage.set_automatic_gc_mode(false);
745
746 {
747 let header = storage.header();
748 assert_eq!(header.journal_region_size, 4096);
749 }
750
751 for i in 0..60 {
752 assert!(storage.put(&id(&i.to_string()), &zeroed_data(42))?);
753 }
754 for i in 0..20 {
755 assert!(storage.delete(&id(&i.to_string()))?);
756 }
757 {
758 let snapshot = track!(storage.journal_snapshot())?;
759 assert_eq!(snapshot.unreleased_head, 0);
760 assert_eq!(snapshot.head, 0);
761 assert_eq!(snapshot.tail, 2100);
762 }
763
764 track!(storage.journal_gc())?;
765 {
766 let snapshot = track!(storage.journal_snapshot())?;
767 assert_eq!(snapshot.unreleased_head, 2100);
768 assert_eq!(snapshot.head, 2100);
769 assert_eq!(snapshot.tail, 3220);
770 }
771
772 track!(storage.journal_gc())?;
773 {
774 let snapshot = track!(storage.journal_snapshot())?;
775 assert_eq!(snapshot.unreleased_head, 3220);
776 assert_eq!(snapshot.head, 3220);
777 assert_eq!(snapshot.tail, 784);
778 }
779
780 Ok(())
781 }
782
783 #[test]
784 fn confirm_that_the_problem_of_pr23_is_resolved() -> TestResult {
795 let dir = track_io!(TempDir::new("cannyls_test"))?;
796
797 let nvm = track!(FileNvm::create(
798 dir.path().join("test.lusf"),
799 BlockSize::min().ceil_align(1024 * 100 * 4)
800 ))?;
801 let mut storage = track!(StorageBuilder::new().journal_region_ratio(0.01).create(nvm))?;
802 assert_eq!(storage.header().journal_region_size, 4096);
803 storage.set_automatic_gc_mode(false);
805
806 let test_lump_id = id("55");
807
808 let vec: Vec<u8> = vec![42; 10];
814 let lump_data = track!(LumpData::new_embedded(vec))?;
815 track!(storage.put(&test_lump_id, &lump_data))?;
816 track!(storage.run_side_job_once())?; track!(storage.run_side_job_once())?; track!(storage.run_side_job_once())?; track!(storage.run_side_job_once())?; {
821 let snapshot = storage.journal_snapshot().unwrap();
822 assert_eq!(snapshot.unreleased_head, 33);
823 assert_eq!(snapshot.head, 66);
824 assert_eq!(snapshot.tail, 66);
825 }
826
827 std::mem::drop(storage);
829 let nvm = track!(FileNvm::open(dir.path().join("test.lusf")))?;
830 let mut storage = track!(Storage::open(nvm))?;
831 storage.set_automatic_gc_mode(false);
832 {
833 let snapshot = storage.journal_snapshot().unwrap();
836 assert_eq!(snapshot.unreleased_head, 33);
837 assert_eq!(snapshot.head, 33); assert_eq!(snapshot.tail, 66);
839 }
840
841 for _ in 0..3 {
843 let vec: Vec<u8> = vec![42; 1000];
844 let lump_data = track!(LumpData::new_embedded(vec))?;
845 track!(storage.put(&test_lump_id, &lump_data))?;
846 track!(storage.delete(&test_lump_id))?;
847 }
848 track!(storage.run_side_job_once())?; track!(storage.run_side_job_once())?; track!(storage.run_side_job_once())?; {
852 let snapshot = storage.journal_snapshot().unwrap();
855 assert_eq!(snapshot.unreleased_head, 3198);
856 assert_eq!(snapshot.head, 3198);
857 assert_eq!(snapshot.tail, 3198);
858 }
859 let vec: Vec<u8> = vec![42; 2000];
872 let lump_data = track!(LumpData::new_embedded(vec))?;
873 track!(storage.put(&test_lump_id, &lump_data))?;
874 {
875 let snapshot = storage.journal_snapshot().unwrap();
878 assert_eq!(snapshot.unreleased_head, 3198);
879 assert_eq!(snapshot.head, 3198);
880 assert_eq!(snapshot.tail, 2023);
881 }
882
883 std::mem::drop(storage);
885 let nvm = track!(FileNvm::open(dir.path().join("test.lusf")))?;
886 let mut storage = track!(Storage::open(nvm))?;
887 {
888 let snapshot = storage.journal_snapshot().unwrap();
889 assert_eq!(snapshot.unreleased_head, 3198);
890 assert_eq!(snapshot.head, 3198);
891 assert_eq!(snapshot.tail, 2023);
892 }
893
894 Ok(())
895 }
896}