1#![deny(missing_docs)]
80#![deny(broken_intra_doc_links)]
81#![deny(unsafe_code)]
82
83use crate::{storage::Storage, timestamp::timestamp_now_monotonic};
84use crc32fast::Hasher;
85use log::warn;
86use serde::{de::DeserializeOwned, Serialize};
87use std::{cmp::max, collections::HashMap, mem};
88use tokio::sync::{Mutex, MutexGuard};
89
90pub mod storage;
91pub mod timestamp;
92
93const BYTES_TIMESTAMP_FULL: usize = 6;
94const BYTES_CRC: usize = 4;
95
96#[derive(Debug)]
98pub enum Error {
99 StorageError(storage::Error),
101 CorruptDataError(u64),
103 InvalidEntryError {
105 reason: String,
107 },
108 InvalidKeyError {
110 reason: String,
112 },
113 InvalidValueError {
115 reason: String,
117 },
118 MaxSizeExceeded {
120 size: usize,
122 max_bytes: u8,
124 bytes_required: u8,
126 },
127 InvalidIntLength {
129 bytes_expected: u8,
131 bytes_found: u8,
133 },
134 StorageLockError,
136 TransactionConflict,
138}
139
140pub type Result<T> = std::result::Result<T, Error>;
142
143impl<'a> From<storage::Error> for Error {
144 fn from(e: storage::Error) -> Self {
145 Error::StorageError(e)
146 }
147}
148
149pub struct KvStore<S: Storage> {
171 name: String,
172 storage: Mutex<S>,
173 offsets: Mutex<HashMap<Vec<u8>, Vec<BlobVersion>>>,
174 latest_timestamp: Mutex<u64>,
175}
176
177impl<S: Storage> KvStore<S> {
178 pub async fn open(storage: S) -> Result<Self> {
187 let mut store = Self {
188 name: String::from(storage.name()),
189 storage: Mutex::new(storage),
190 offsets: Mutex::new(HashMap::new()),
191 latest_timestamp: Mutex::new(0),
192 };
193 init_store(&mut store).await?;
194 Ok(store)
195 }
196
197 pub fn name(&self) -> &str {
199 &self.name
200 }
201
202 pub fn into_storage(self) -> Result<S> {
204 Ok(self.storage.into_inner())
205 }
206
207 pub async fn len(&self) -> u64 {
209 self.storage.lock().await.len()
210 }
211
212 pub async fn is_empty(&self) -> bool {
214 self.len().await == 0
215 }
216
217 pub async fn current(&self) -> Snapshot<'_, S> {
220 let latest_timestamp = *self.latest_timestamp.lock().await;
221 let latest_offset = self.storage.lock().await.len();
222 let snapshot_timestamp = timestamp_now_monotonic(latest_timestamp);
223 Snapshot {
224 store: self,
225 snapshot_timestamp,
226 latest_timestamp,
227 latest_offset,
228 cached_entries: Mutex::new(HashMap::new()),
229 transaction_entries: HashMap::new(),
230 }
231 }
232
233 pub async fn merge(&mut self) -> Result<()> {
240 {
241 self.storage.lock().await.flush().await?;
242 self.storage.lock().await.start_merge().await?;
243
244 let mut crc = Hasher::new();
245 let mut offset = 0;
246 let mut storage = self.storage.lock().await;
247 while offset < storage.len() {
248 let mut entry = Entry::read_from(&mut storage, offset).await?;
249 let entry_length = entry.len() as u64;
250 let offsets = &mut (*self.offsets.lock().await);
251
252 if let Some(k) = entry.key.as_ref() {
254 if offset == offsets[k].last().unwrap().offset {
255 entry.update_crc(&mut crc);
256 entry.write_to(&mut storage).await?;
257 }
258 } else if entry.is_transaction_commit() {
259 entry.update_crc(&mut crc);
260 let crc_merged = crc.finalize();
261 let crc_original = entry.crc()?;
262 if crc_merged != crc_original {
263 entry.set_crc(crc_merged);
264 }
265 entry.write_to(&mut storage).await?;
266 crc = Hasher::new();
267 }
268 offset += entry_length as u64;
269 }
270
271 storage.flush().await?;
272 storage.stop_merge().await?;
273 storage.flush().await?;
274 }
275 init_store(self).await?;
276 Ok(())
277 }
278}
279
280#[derive(Debug, Copy, Clone)]
281enum SnapshotBoundary {
282 Timestamp(u64),
283 Offset(u64),
284}
285
286pub struct Snapshot<'a, S: Storage> {
317 store: &'a KvStore<S>,
318 snapshot_timestamp: u64,
319 latest_timestamp: u64,
320 latest_offset: u64,
321 transaction_entries: HashMap<Vec<u8>, Option<Vec<u8>>>,
322 cached_entries: Mutex<HashMap<Vec<u8>, ValuesByVersion>>,
323}
324
325type ValuesByVersion = HashMap<Version, Option<Vec<u8>>>;
326
327impl<'a, S: Storage> Snapshot<'a, S> {
328 pub fn name(&self) -> &str {
330 self.store.name()
331 }
332
333 pub async fn get<K, V>(&self, slot: u8, k: &K) -> Result<Option<V>>
338 where
339 K: Serialize,
340 V: DeserializeOwned,
341 {
342 let versions = self.versions(slot, k).await?;
343 blob_to_serde_value(self.get_bytes(slot, k, versions.last().copied()).await?)
344 }
345
346 pub async fn get_unremoved<K, V>(&self, slot: u8, k: &K) -> Result<Option<V>>
356 where
357 K: Serialize,
358 V: DeserializeOwned,
359 {
360 let versions = self.versions(slot, k).await?;
361 let unremoved = versions.iter().filter(|v| !v.is_removed);
362 blob_to_serde_value(self.get_bytes(slot, k, unremoved.last().copied()).await?)
363 }
364
365 pub async fn get_version<K, V>(&self, slot: u8, k: &K, version: Version) -> Result<Option<V>>
367 where
368 K: Serialize,
369 V: DeserializeOwned,
370 {
371 blob_to_serde_value(self.get_bytes(slot, k, Some(version)).await?)
372 }
373
374 async fn get_bytes<K>(&self, slot: u8, k: &K, v: Option<Version>) -> Result<Option<Vec<u8>>>
375 where
376 K: Serialize,
377 {
378 let k = &serde_to_blob_key(slot, k)?;
379 let mut cached_entries = self.cached_entries.lock().await;
380 if !cached_entries.contains_key(k) {
381 cached_entries.insert(k.clone(), HashMap::new());
382 }
383 if let Some(version) = v {
384 if let Some(entry) = self.transaction_entries.get(k) {
385 if !version.is_committed {
386 return Ok(entry.clone());
387 }
388 }
389 let versions = cached_entries.get_mut(k).unwrap();
390 if let Some(entry) = versions.get(&version) {
391 return Ok(entry.clone());
392 }
393
394 if let Some(offset) = version.offset {
395 let entry = Entry::read_from(&mut self.store.storage.lock().await, offset).await?;
396 versions.insert(version, entry.val.clone());
397 Ok(entry.val)
398 } else {
399 Ok(None)
400 }
401 } else {
402 Ok(None)
403 }
404 }
405
406 pub async fn versions<K>(&self, slot: u8, k: &K) -> Result<Vec<Version>>
412 where
413 K: Serialize,
414 {
415 let k = &serde_to_blob_key(slot, k)?;
416 let up_until = self.latest_time_or_offset();
417 let mut versions: Vec<Version> =
418 versions_up_until(self.store.offsets.lock().await.get(k), up_until)
419 .into_iter()
420 .map(|v| v.into())
421 .collect();
422 if let Some(entry) = self.transaction_entries.get(k) {
423 versions.push(Version {
424 offset: None,
425 is_committed: false,
426 is_removed: entry.is_none(),
427 timestamp: self.snapshot_timestamp,
428 });
429 }
430 Ok(versions)
431 }
432
433 pub async fn last_updated(&self) -> Result<Option<u64>> {
436 Ok(if !self.transaction_entries.is_empty() {
437 Some(self.snapshot_timestamp)
438 } else if self.latest_timestamp > 0 {
439 Some(self.latest_timestamp)
440 } else {
441 None
442 })
443 }
444
445 pub async fn keys<K: DeserializeOwned>(&self, slot: u8) -> Result<Vec<K>> {
450 let mut keys: Vec<K> = Vec::new();
451 for (key, versions) in self.store.offsets.lock().await.iter() {
452 let versions = versions_up_until(Some(versions), self.latest_time_or_offset());
453 if let Some(s) = key.last() {
454 if *s == slot && !versions.last().unwrap().is_removed {
455 let key = rmp_serde::decode::from_read(&key[..key.len() - 1]).map_err(|e| {
456 Error::InvalidKeyError {
457 reason: format!("{}", e),
458 }
459 });
460 keys.push(key?);
461 }
462 }
463 }
464 Ok(keys)
465 }
466
467 pub fn insert<K, V>(&mut self, slot: u8, k: K, v: V) -> Result<()>
476 where
477 K: Serialize,
478 V: Serialize,
479 {
480 let v = serde_to_blob_value(&v)?;
481 self.insert_bytes(slot, k, v)?;
482 Ok(())
483 }
484
485 fn insert_bytes<K>(&mut self, slot: u8, k: K, v: Vec<u8>) -> Result<()>
486 where
487 K: Serialize,
488 {
489 let k = serde_to_blob_key(slot, &k)?;
490 self.transaction_entries.insert(k, Some(v));
491 Ok(())
492 }
493
494 pub fn remove<K>(&mut self, slot: u8, k: K) -> Result<()>
506 where
507 K: Serialize,
508 {
509 let k = serde_to_blob_key(slot, &k)?;
510 self.transaction_entries.insert(k, None);
511 Ok(())
512 }
513
514 pub async fn abort(mut self) -> Result<()> {
516 self.transaction_entries.clear();
518 Ok(())
519 }
520
521 pub async fn commit(mut self) -> Result<()> {
524 let entries = mem::take(&mut self.transaction_entries);
525 if entries.is_empty() {
526 return Ok(());
527 }
528 let mut storage = self.store.storage.lock().await;
529 let mut offsets = self.store.offsets.lock().await;
530 {
531 for k in self.cached_entries.lock().await.keys() {
532 if let Some(versions) = offsets.get(k) {
533 let version = versions
534 .last()
535 .unwrap_or_else(|| panic!("could not find last version of key {:?}", k));
536
537 if version.offset >= self.latest_offset {
542 return Err(Error::TransactionConflict);
543 }
544 }
545 }
546 }
547
548 let mut crc = Hasher::new();
549 let mut uncommitted_offsets = Vec::with_capacity(entries.len());
550 for (k, buf) in entries.into_iter() {
551 if let Some(buf) = buf {
552 let entry = Entry::kv_insert(k, buf)?;
553 let offset = entry.write_to(&mut storage).await?;
554 entry.update_crc(&mut crc);
555 uncommitted_offsets.push((entry.key.unwrap(), offset, false));
556 } else {
557 let entry = Entry::kv_remove(k)?;
558 let offset = entry.write_to(&mut storage).await?;
559 entry.update_crc(&mut crc);
560 uncommitted_offsets.push((entry.key.unwrap(), offset, true));
561 }
562 }
563
564 let t_commit = timestamp_now_monotonic(self.latest_timestamp);
565 let mut entry = Entry::transaction_commit(t_commit)?;
566 entry.update_crc(&mut crc);
567 entry.set_crc(crc.finalize());
568 entry.write_to(&mut storage).await?;
569
570 for (k, offset, is_removed) in uncommitted_offsets {
571 offsets
572 .entry(k.to_vec())
573 .or_insert_with(Vec::new)
574 .push(BlobVersion {
575 offset,
576 is_removed,
577 timestamp: t_commit,
578 });
579 }
580 *self.store.latest_timestamp.lock().await = t_commit;
581 storage.flush().await?;
582 Ok(())
583 }
584
585 fn latest_time_or_offset(&self) -> SnapshotBoundary {
586 if self.snapshot_timestamp == self.latest_timestamp {
587 SnapshotBoundary::Offset(self.latest_offset)
588 } else {
589 SnapshotBoundary::Timestamp(self.latest_timestamp)
590 }
591 }
592}
593
594impl<S: Storage> Drop for Snapshot<'_, S> {
595 fn drop(&mut self) {
596 if !self.transaction_entries.is_empty() {
597 warn!("Snapshot with changes was dropped without being committed!");
598 }
599 }
600}
601
602async fn init_store<S: Storage>(store: &mut KvStore<S>) -> Result<()> {
603 let mut uncommitted = Vec::new();
604 let mut crc = Hasher::new();
605 let mut latest_timestamp = 0;
606 let mut offset = 0;
607 let max_offset = store.len().await;
608 while offset < max_offset {
609 let entry = Entry::read_from(&mut store.storage.lock().await, offset).await?;
610 let entry_length = entry.len() as u64;
611 let offsets = &mut (*store.offsets.lock().await);
612
613 if !entry.is_transaction() {
614 entry.update_crc(&mut crc);
615 uncommitted.push((entry.key.unwrap(), offset, entry.val.is_none()));
616 } else if entry.is_transaction_commit() {
617 entry.update_crc(&mut crc);
618 let crc_kv_writes = crc.finalize();
619 let crc_commit = entry.crc()?;
620 if crc_kv_writes != crc_commit {
621 warn!("Truncating corrupt store at offset {}", offset);
622 store
623 .storage
624 .lock()
625 .await
626 .truncate(offset)
627 .await
628 .expect("Error while truncating storage to remove corrupt data");
629 break;
630 }
631
632 let timestamp_commit = u64_from_bytes(entry.val.as_ref().unwrap())?;
633 for (k, offset, is_removed) in uncommitted.iter() {
634 offsets
635 .entry(k.to_vec())
636 .or_insert_with(Vec::new)
637 .push(BlobVersion {
638 offset: *offset,
639 is_removed: *is_removed,
640 timestamp: timestamp_commit,
641 });
642 }
643 uncommitted.clear();
644 crc = Hasher::new();
645 latest_timestamp = max(latest_timestamp, timestamp_commit);
646 }
647
648 offset += entry_length as u64;
649 }
650 store.latest_timestamp = Mutex::new(latest_timestamp);
651 Ok(())
652}
653
654fn serde_to_blob_key(slot: u8, k: &impl Serialize) -> Result<Vec<u8>> {
655 rmp_serde::encode::to_vec(k)
656 .map(|mut v| {
657 v.push(slot);
658 v
659 })
660 .map_err(|e| Error::InvalidKeyError {
661 reason: format!("Unable to serialize: {}", e),
662 })
663}
664
665fn serde_to_blob_value(v: &impl Serialize) -> Result<Vec<u8>> {
666 rmp_serde::encode::to_vec(v).map_err(|e| Error::InvalidValueError {
667 reason: format!("Unable to serialize: {}", e),
668 })
669}
670
671fn blob_to_serde_value<V>(v: Option<Vec<u8>>) -> Result<Option<V>>
672where
673 V: DeserializeOwned,
674{
675 v.map(|v| {
676 rmp_serde::decode::from_read(&v[..]).map_err(|e| Error::InvalidValueError {
677 reason: format!("{}", e),
678 })
679 })
680 .transpose()
681}
682
683#[derive(Debug, Copy, Clone)]
684struct BlobVersion {
685 offset: u64,
686 is_removed: bool,
687 timestamp: u64,
688}
689
690fn versions_up_until(
691 versions: Option<&Vec<BlobVersion>>,
692 up_until: SnapshotBoundary,
693) -> Vec<BlobVersion> {
694 versions.map_or(Vec::new(), |v| {
695 v.iter()
696 .filter(|v| match up_until {
697 SnapshotBoundary::Timestamp(t) => v.timestamp <= t,
698 SnapshotBoundary::Offset(o) => v.offset < o,
699 })
700 .cloned()
701 .collect()
702 })
703}
704
705type Value = Vec<u8>;
706
707#[derive(Debug)]
708struct Entry {
709 header: u8,
710 sizes: Vec<u8>,
711 key: Option<Vec<u8>>,
712 val: Option<Vec<u8>>,
713 crc: Option<Vec<u8>>,
714}
715
716impl Entry {
723 fn transaction_commit(timestamp: u64) -> Result<Self> {
724 let mut buf = vec![0; BYTES_TIMESTAMP_FULL];
725 buf.copy_from_slice(×tamp.to_le_bytes()[..BYTES_TIMESTAMP_FULL]);
726 Self::new(None, Some(buf))
727 }
728
729 fn kv_insert(k: Vec<u8>, v: Value) -> Result<Self> {
730 Self::new(Some(k), Some(v))
731 }
732
733 fn kv_remove(k: Vec<u8>) -> Result<Self> {
734 Self::new(Some(k), None)
735 }
736
737 fn new(k: Option<Vec<u8>>, v: Option<Value>) -> Result<Self> {
738 let key_size = k.as_ref().map_or(0, |k| k.len());
739 let bytes_key_size = k
740 .as_ref()
741 .map_or(Ok(0), |k| bytes_required_for(k.len(), 3))?;
742 let val_size = v.as_ref().map_or(0, |v| v.len());
743 let bytes_val_size = v
744 .as_ref()
745 .map_or(Ok(0), |v| bytes_required_for(v.len(), 6))?;
746 let header = (bytes_key_size << 3) | bytes_val_size;
747
748 let mut sizes = vec![0; (bytes_key_size + bytes_val_size) as usize];
749 sizes[..bytes_key_size as usize]
750 .copy_from_slice(&key_size.to_le_bytes()[0..bytes_key_size as usize]);
751 sizes[bytes_key_size as usize..]
752 .copy_from_slice(&val_size.to_le_bytes()[0..bytes_val_size as usize]);
753
754 Ok(Self {
755 header,
756 sizes,
757 key: k,
758 val: v,
759 crc: None,
760 })
761 }
762
763 async fn read_from<S: Storage>(storage: &mut MutexGuard<'_, S>, offset: u64) -> Result<Self> {
764 let max_length_of_header_and_sizes = 1 + 3 + 6;
765 let mut header_and_sizes = storage.read(offset, max_length_of_header_and_sizes).await?;
766 if header_and_sizes.is_empty() {
767 return Err(Error::InvalidEntryError {
768 reason: "Offset exceeds storage bounds".to_string(),
769 });
770 }
771 let header = header_and_sizes.remove(0);
772 let mut sizes = header_and_sizes;
773 let bytes_val_size = (header & 0b111) as u32;
774 let bytes_key_size = ((header & 0b11000) >> 3) as u32;
775 if bytes_key_size > 3 {
776 return Err(Error::InvalidEntryError {
777 reason: format!(
778 "Key size can have a maximum of 3 bytes, but has {}",
779 bytes_key_size
780 ),
781 });
782 }
783 if bytes_val_size > 6 {
784 return Err(Error::InvalidEntryError {
785 reason: format!(
786 "Value size can have a maximum of 6 bytes, but has {}",
787 bytes_val_size
788 ),
789 });
790 }
791
792 let offset_sizes = offset + 1;
793 let bytes_sizes = bytes_key_size + bytes_val_size;
794 if sizes.len() < bytes_key_size as usize {
795 return Err(Error::InvalidEntryError {
796 reason: "Invalid length of entry size buffer".to_string(),
797 });
798 }
799 sizes.truncate(bytes_sizes as usize);
800
801 let key_size = u32_from_bytes(&sizes[..bytes_key_size as usize])?;
802 let val_size = u32_from_bytes(&sizes[bytes_key_size as usize..])?;
803 let offset_content = offset_sizes + bytes_sizes as u64;
804
805 if key_size > (1 << 16) {
806 return Err(Error::InvalidEntryError {
807 reason: "Key size is > max size of 2^16 bytes".to_string(),
808 });
809 }
810 if val_size > (1 << 24) {
811 return Err(Error::InvalidEntryError {
812 reason: "Value size is > max size of 2^24 bytes".to_string(),
813 });
814 }
815
816 let is_transaction = bytes_key_size == 0;
817 let (key, val, crc) = if is_transaction {
818 if val_size > 0 {
819 let bytes_content = key_size + val_size + BYTES_CRC as u32;
820 let content = storage.read(offset_content, bytes_content).await?;
821 if content.len() < val_size as usize {
822 return Err(Error::InvalidEntryError {
823 reason: "Invalid length of entry content buffer".to_string(),
824 });
825 }
826 (
827 None,
828 Some(content[..val_size as usize].to_vec()),
829 Some(content[val_size as usize..].to_vec()),
830 )
831 } else {
832 (None, None, None)
833 }
834 } else {
835 let bytes_content = key_size + val_size;
836 let content = storage.read(offset_content, bytes_content).await?;
837 if content.len() < key_size as usize {
838 return Err(Error::InvalidEntryError {
839 reason: "Invalid length of entry content buffer".to_string(),
840 });
841 }
842 let key = Some(content[..key_size as usize].to_vec());
843 if val_size > 0 {
844 (key, Some(content[key_size as usize..].to_vec()), None)
845 } else {
846 (key, None, None)
847 }
848 };
849 Ok(Self {
850 header,
851 sizes,
852 key,
853 val,
854 crc,
855 })
856 }
857
858 async fn write_to<S: Storage>(&self, storage: &mut MutexGuard<'_, S>) -> Result<u64> {
859 let offset = storage.write(&[self.header]).await?;
860 storage.write(&self.sizes).await?;
861 if let Some(k) = &self.key {
862 storage.write(k).await?;
863 }
864 if let Some(v) = &self.val {
865 storage.write(v).await?;
866 }
867 if let Some(crc) = &self.crc {
868 storage.write(crc).await?;
869 }
870 Ok(offset)
871 }
872
873 fn set_crc(&mut self, crc: u32) {
874 self.crc = Some(crc.to_le_bytes().to_vec());
875 }
876
877 fn crc(&self) -> Result<u32> {
878 if !self.is_transaction_commit() {
879 panic!("Trying to read CRC value from a non-commit entry");
880 }
881 u32_from_bytes(self.crc.as_ref().unwrap())
882 }
883
884 fn update_crc(&self, crc: &mut Hasher) {
885 crc.update(&[self.header]);
886 crc.update(&self.sizes);
887 if let Some(k) = &self.key {
888 crc.update(k);
889 }
890 if let Some(v) = &self.val {
891 crc.update(v);
892 }
893 }
894
895 fn len(&self) -> usize {
896 1 + self.sizes.len()
897 + self.key.as_ref().map_or(0, |k| k.len())
898 + self.val.as_ref().map_or(0, |v| v.len())
899 + self.crc.as_ref().map_or(0, |crc| crc.len())
900 }
901
902 fn is_transaction(&self) -> bool {
903 self.key.is_none()
904 }
905
906 fn is_transaction_commit(&self) -> bool {
907 self.is_transaction() && self.val.as_ref().is_some()
908 }
909}
910
911fn u64_from_bytes(bytes: &[u8]) -> Result<u64> {
912 if bytes.len() > 8 {
913 Err(Error::InvalidIntLength {
914 bytes_expected: 8,
915 bytes_found: bytes.len() as u8,
916 })
917 } else {
918 let mut buf = [0; 8];
919 buf[..bytes.len()].copy_from_slice(bytes);
920 Ok(u64::from_le_bytes(buf))
921 }
922}
923
924fn u32_from_bytes(bytes: &[u8]) -> Result<u32> {
925 if bytes.len() > 4 {
926 Err(Error::InvalidIntLength {
927 bytes_expected: 4,
928 bytes_found: bytes.len() as u8,
929 })
930 } else {
931 let mut buf = [0; 4];
932 buf[..bytes.len()].copy_from_slice(bytes);
933 Ok(u32::from_le_bytes(buf))
934 }
935}
936
937fn bytes_required_for(n: usize, max_bytes: u8) -> Result<u8> {
938 let zero: usize = 0;
939 let bit_length = zero.leading_zeros() - n.leading_zeros();
940 let mut bytes_required = (bit_length / 8) as u8;
941 if bit_length == 0 || bit_length % 8 != 0 {
942 bytes_required += 1;
943 };
944 if bytes_required > max_bytes {
945 Err(Error::MaxSizeExceeded {
946 size: n,
947 max_bytes,
948 bytes_required,
949 })
950 } else {
951 Ok(bytes_required)
952 }
953}
954
955#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
957pub struct Version {
958 offset: Option<u64>,
959 pub is_committed: bool,
961 pub is_removed: bool,
963 pub timestamp: u64,
965}
966
967impl From<BlobVersion> for Version {
968 fn from(v: BlobVersion) -> Self {
969 Self {
970 offset: Some(v.offset),
971 is_committed: true,
972 is_removed: v.is_removed,
973 timestamp: v.timestamp,
974 }
975 }
976}