1use std::{collections::BTreeMap, fmt::Debug, ops::Bound::Included, sync::Mutex};
17
18use allocative::Allocative;
19#[cfg(with_metrics)]
20use linera_base::prometheus_util::MeasureLatency as _;
21use linera_base::{data_types::ArithmeticError, ensure, visit_allocative_simple};
22use serde::{Deserialize, Serialize};
23
24use crate::{
25 batch::{Batch, WriteOperation},
26 common::{
27 from_bytes_option, get_key_range_for_prefix, get_upper_bound, DeletionSet, HasherOutput,
28 SuffixClosedSetIterator, Update,
29 },
30 context::Context,
31 hashable_wrapper::WrappedHashableContainerView,
32 historical_hash_wrapper::HistoricallyHashableView,
33 store::ReadableKeyValueStore,
34 views::{ClonableView, HashableView, Hasher, ReplaceContext, View, ViewError, MIN_VIEW_TAG},
35};
36
37#[cfg(with_metrics)]
38mod metrics {
39 use std::sync::LazyLock;
40
41 use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
42 use prometheus::HistogramVec;
43
44 pub static KEY_VALUE_STORE_VIEW_HASH_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
46 register_histogram_vec(
47 "key_value_store_view_hash_latency",
48 "KeyValueStoreView hash latency",
49 &[],
50 exponential_bucket_latencies(5.0),
51 )
52 });
53
54 pub static KEY_VALUE_STORE_VIEW_GET_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
56 register_histogram_vec(
57 "key_value_store_view_get_latency",
58 "KeyValueStoreView get latency",
59 &[],
60 exponential_bucket_latencies(5.0),
61 )
62 });
63
64 pub static KEY_VALUE_STORE_VIEW_MULTI_GET_LATENCY: LazyLock<HistogramVec> =
66 LazyLock::new(|| {
67 register_histogram_vec(
68 "key_value_store_view_multi_get_latency",
69 "KeyValueStoreView multi get latency",
70 &[],
71 exponential_bucket_latencies(5.0),
72 )
73 });
74
75 pub static KEY_VALUE_STORE_VIEW_CONTAINS_KEY_LATENCY: LazyLock<HistogramVec> =
77 LazyLock::new(|| {
78 register_histogram_vec(
79 "key_value_store_view_contains_key_latency",
80 "KeyValueStoreView contains key latency",
81 &[],
82 exponential_bucket_latencies(5.0),
83 )
84 });
85
86 pub static KEY_VALUE_STORE_VIEW_CONTAINS_KEYS_LATENCY: LazyLock<HistogramVec> =
88 LazyLock::new(|| {
89 register_histogram_vec(
90 "key_value_store_view_contains_keys_latency",
91 "KeyValueStoreView contains keys latency",
92 &[],
93 exponential_bucket_latencies(5.0),
94 )
95 });
96
97 pub static KEY_VALUE_STORE_VIEW_FIND_KEYS_BY_PREFIX_LATENCY: LazyLock<HistogramVec> =
99 LazyLock::new(|| {
100 register_histogram_vec(
101 "key_value_store_view_find_keys_by_prefix_latency",
102 "KeyValueStoreView find keys by prefix latency",
103 &[],
104 exponential_bucket_latencies(5.0),
105 )
106 });
107
108 pub static KEY_VALUE_STORE_VIEW_FIND_KEY_VALUES_BY_PREFIX_LATENCY: LazyLock<HistogramVec> =
110 LazyLock::new(|| {
111 register_histogram_vec(
112 "key_value_store_view_find_key_values_by_prefix_latency",
113 "KeyValueStoreView find key values by prefix latency",
114 &[],
115 exponential_bucket_latencies(5.0),
116 )
117 });
118
119 pub static KEY_VALUE_STORE_VIEW_WRITE_BATCH_LATENCY: LazyLock<HistogramVec> =
121 LazyLock::new(|| {
122 register_histogram_vec(
123 "key_value_store_view_write_batch_latency",
124 "KeyValueStoreView write batch latency",
125 &[],
126 exponential_bucket_latencies(5.0),
127 )
128 });
129}
130
131#[cfg(with_testing)]
132use {
133 crate::store::{KeyValueStoreError, WithError, WritableKeyValueStore},
134 async_lock::RwLock,
135 std::sync::Arc,
136 thiserror::Error,
137};
138
139#[repr(u8)]
140enum KeyTag {
141 Index = MIN_VIEW_TAG,
143 Hash = MIN_VIEW_TAG + 3,
145}
146
147#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize, Allocative)]
149pub struct SizeData {
150 pub key: u32,
152 pub value: u32,
154}
155
156impl SizeData {
157 pub fn sum(&mut self) -> u32 {
159 self.key + self.value
160 }
161
162 pub fn add_assign(&mut self, size: SizeData) -> Result<(), ViewError> {
164 self.key = self
165 .key
166 .checked_add(size.key)
167 .ok_or(ViewError::ArithmeticError(ArithmeticError::Overflow))?;
168 self.value = self
169 .value
170 .checked_add(size.value)
171 .ok_or(ViewError::ArithmeticError(ArithmeticError::Overflow))?;
172 Ok(())
173 }
174
175 pub fn sub_assign(&mut self, size: SizeData) {
177 self.key -= size.key;
178 self.value -= size.value;
179 }
180}
181
182#[derive(Debug, Allocative)]
200#[allocative(bound = "C")]
201pub struct KeyValueStoreView<C> {
202 #[allocative(skip)]
204 context: C,
205 deletion_set: DeletionSet,
207 updates: BTreeMap<Vec<u8>, Update<Vec<u8>>>,
209 #[allocative(visit = visit_allocative_simple)]
211 stored_hash: Option<HasherOutput>,
212 #[allocative(visit = visit_allocative_simple)]
214 hash: Mutex<Option<HasherOutput>>,
215}
216
217impl<C: Context, C2: Context> ReplaceContext<C2> for KeyValueStoreView<C> {
218 type Target = KeyValueStoreView<C2>;
219
220 async fn with_context(
221 &mut self,
222 ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
223 ) -> Self::Target {
224 let hash = *self.hash.lock().unwrap();
225 KeyValueStoreView {
226 context: ctx.clone()(self.context()),
227 deletion_set: self.deletion_set.clone(),
228 updates: self.updates.clone(),
229 stored_hash: self.stored_hash,
230 hash: Mutex::new(hash),
231 }
232 }
233}
234
235impl<C: Context> View for KeyValueStoreView<C> {
236 const NUM_INIT_KEYS: usize = 1;
237
238 type Context = C;
239
240 fn context(&self) -> &C {
241 &self.context
242 }
243
244 fn pre_load(context: &C) -> Result<Vec<Vec<u8>>, ViewError> {
245 let key_hash = context.base_key().base_tag(KeyTag::Hash as u8);
246 let v = vec![key_hash];
247 Ok(v)
248 }
249
250 fn post_load(context: C, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
251 let hash = from_bytes_option(values.first().ok_or(ViewError::PostLoadValuesError)?)?;
252 Ok(Self {
253 context,
254 deletion_set: DeletionSet::new(),
255 updates: BTreeMap::new(),
256 stored_hash: hash,
257 hash: Mutex::new(hash),
258 })
259 }
260
261 fn rollback(&mut self) {
262 self.deletion_set.rollback();
263 self.updates.clear();
264 *self.hash.get_mut().unwrap() = self.stored_hash;
265 }
266
267 async fn has_pending_changes(&self) -> bool {
268 if self.deletion_set.has_pending_changes() {
269 return true;
270 }
271 if !self.updates.is_empty() {
272 return true;
273 }
274 let hash = self.hash.lock().unwrap();
275 self.stored_hash != *hash
276 }
277
278 fn pre_save(&self, batch: &mut Batch) -> Result<bool, ViewError> {
279 let mut delete_view = false;
280 if self.deletion_set.delete_storage_first {
281 delete_view = true;
282 batch.delete_key_prefix(self.context.base_key().bytes.clone());
283 for (index, update) in self.updates.iter() {
284 if let Update::Set(value) = update {
285 let key = self
286 .context
287 .base_key()
288 .base_tag_index(KeyTag::Index as u8, index);
289 batch.put_key_value_bytes(key, value.clone());
290 delete_view = false;
291 }
292 }
293 } else {
294 for index in self.deletion_set.deleted_prefixes.iter() {
295 let key = self
296 .context
297 .base_key()
298 .base_tag_index(KeyTag::Index as u8, index);
299 batch.delete_key_prefix(key);
300 }
301 for (index, update) in self.updates.iter() {
302 let key = self
303 .context
304 .base_key()
305 .base_tag_index(KeyTag::Index as u8, index);
306 match update {
307 Update::Removed => batch.delete_key(key),
308 Update::Set(value) => batch.put_key_value_bytes(key, value.clone()),
309 }
310 }
311 }
312 let hash = *self.hash.lock().unwrap();
313 if self.stored_hash != hash {
314 let key = self.context.base_key().base_tag(KeyTag::Hash as u8);
315 match hash {
316 None => batch.delete_key(key),
317 Some(hash) => batch.put_key_value(key, &hash)?,
318 }
319 }
320 Ok(delete_view)
321 }
322
323 fn post_save(&mut self) {
324 self.deletion_set.delete_storage_first = false;
325 self.deletion_set.deleted_prefixes.clear();
326 self.updates.clear();
327 let hash = *self.hash.lock().unwrap();
328 self.stored_hash = hash;
329 }
330
331 fn clear(&mut self) {
332 self.deletion_set.clear();
333 self.updates.clear();
334 *self.hash.get_mut().unwrap() = None;
335 }
336}
337
338impl<C: Context> ClonableView for KeyValueStoreView<C> {
339 fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
340 Ok(KeyValueStoreView {
341 context: self.context.clone(),
342 deletion_set: self.deletion_set.clone(),
343 updates: self.updates.clone(),
344 stored_hash: self.stored_hash,
345 hash: Mutex::new(*self.hash.get_mut().unwrap()),
346 })
347 }
348}
349
350impl<C: Context> KeyValueStoreView<C> {
351 fn max_key_size(&self) -> usize {
352 let prefix_len = self.context.base_key().bytes.len();
353 <C::Store as ReadableKeyValueStore>::MAX_KEY_SIZE - 1 - prefix_len
354 }
355
356 pub async fn for_each_index_while<F>(&self, mut f: F) -> Result<(), ViewError>
379 where
380 F: FnMut(&[u8]) -> Result<bool, ViewError> + Send,
381 {
382 let key_prefix = self.context.base_key().base_tag(KeyTag::Index as u8);
383 let mut updates = self.updates.iter();
384 let mut update = updates.next();
385 if !self.deletion_set.delete_storage_first {
386 let mut suffix_closed_set =
387 SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
388 for index in self
389 .context
390 .store()
391 .find_keys_by_prefix(&key_prefix)
392 .await?
393 {
394 loop {
395 match update {
396 Some((key, value)) if key <= &index => {
397 if let Update::Set(_) = value {
398 if !f(key)? {
399 return Ok(());
400 }
401 }
402 update = updates.next();
403 if key == &index {
404 break;
405 }
406 }
407 _ => {
408 if !suffix_closed_set.find_key(&index) && !f(&index)? {
409 return Ok(());
410 }
411 break;
412 }
413 }
414 }
415 }
416 }
417 while let Some((key, value)) = update {
418 if let Update::Set(_) = value {
419 if !f(key)? {
420 return Ok(());
421 }
422 }
423 update = updates.next();
424 }
425 Ok(())
426 }
427
428 pub async fn for_each_index<F>(&self, mut f: F) -> Result<(), ViewError>
450 where
451 F: FnMut(&[u8]) -> Result<(), ViewError> + Send,
452 {
453 self.for_each_index_while(|key| {
454 f(key)?;
455 Ok(true)
456 })
457 .await
458 }
459
460 pub async fn for_each_index_value_while<F>(&self, mut f: F) -> Result<(), ViewError>
482 where
483 F: FnMut(&[u8], &[u8]) -> Result<bool, ViewError> + Send,
484 {
485 let key_prefix = self.context.base_key().base_tag(KeyTag::Index as u8);
486 let mut updates = self.updates.iter();
487 let mut update = updates.next();
488 if !self.deletion_set.delete_storage_first {
489 let mut suffix_closed_set =
490 SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
491 for entry in self
492 .context
493 .store()
494 .find_key_values_by_prefix(&key_prefix)
495 .await?
496 {
497 let (index, index_val) = entry;
498 loop {
499 match update {
500 Some((key, value)) if key <= &index => {
501 if let Update::Set(value) = value {
502 if !f(key, value)? {
503 return Ok(());
504 }
505 }
506 update = updates.next();
507 if key == &index {
508 break;
509 }
510 }
511 _ => {
512 if !suffix_closed_set.find_key(&index) && !f(&index, &index_val)? {
513 return Ok(());
514 }
515 break;
516 }
517 }
518 }
519 }
520 }
521 while let Some((key, value)) = update {
522 if let Update::Set(value) = value {
523 if !f(key, value)? {
524 return Ok(());
525 }
526 }
527 update = updates.next();
528 }
529 Ok(())
530 }
531
532 pub async fn for_each_index_value<F>(&self, mut f: F) -> Result<(), ViewError>
553 where
554 F: FnMut(&[u8], &[u8]) -> Result<(), ViewError> + Send,
555 {
556 self.for_each_index_value_while(|key, value| {
557 f(key, value)?;
558 Ok(true)
559 })
560 .await
561 }
562
563 pub async fn indices(&self) -> Result<Vec<Vec<u8>>, ViewError> {
578 let mut indices = Vec::new();
579 self.for_each_index(|index| {
580 indices.push(index.to_vec());
581 Ok(())
582 })
583 .await?;
584 Ok(indices)
585 }
586
587 pub async fn index_values(&self) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ViewError> {
602 let mut index_values = Vec::new();
603 self.for_each_index_value(|index, value| {
604 index_values.push((index.to_vec(), value.to_vec()));
605 Ok(())
606 })
607 .await?;
608 Ok(index_values)
609 }
610
611 pub async fn count(&self) -> Result<usize, ViewError> {
626 let mut count = 0;
627 self.for_each_index(|_index| {
628 count += 1;
629 Ok(())
630 })
631 .await?;
632 Ok(count)
633 }
634
635 pub async fn get(&self, index: &[u8]) -> Result<Option<Vec<u8>>, ViewError> {
649 #[cfg(with_metrics)]
650 let _latency = metrics::KEY_VALUE_STORE_VIEW_GET_LATENCY.measure_latency();
651 ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
652 if let Some(update) = self.updates.get(index) {
653 let value = match update {
654 Update::Removed => None,
655 Update::Set(value) => Some(value.clone()),
656 };
657 return Ok(value);
658 }
659 if self.deletion_set.contains_prefix_of(index) {
660 return Ok(None);
661 }
662 let key = self
663 .context
664 .base_key()
665 .base_tag_index(KeyTag::Index as u8, index);
666 Ok(self.context.store().read_value_bytes(&key).await?)
667 }
668
669 pub async fn contains_key(&self, index: &[u8]) -> Result<bool, ViewError> {
683 #[cfg(with_metrics)]
684 let _latency = metrics::KEY_VALUE_STORE_VIEW_CONTAINS_KEY_LATENCY.measure_latency();
685 ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
686 if let Some(update) = self.updates.get(index) {
687 let test = match update {
688 Update::Removed => false,
689 Update::Set(_value) => true,
690 };
691 return Ok(test);
692 }
693 if self.deletion_set.contains_prefix_of(index) {
694 return Ok(false);
695 }
696 let key = self
697 .context
698 .base_key()
699 .base_tag_index(KeyTag::Index as u8, index);
700 Ok(self.context.store().contains_key(&key).await?)
701 }
702
703 pub async fn contains_keys(&self, indices: &[Vec<u8>]) -> Result<Vec<bool>, ViewError> {
718 #[cfg(with_metrics)]
719 let _latency = metrics::KEY_VALUE_STORE_VIEW_CONTAINS_KEYS_LATENCY.measure_latency();
720 let mut results = Vec::with_capacity(indices.len());
721 let mut missed_indices = Vec::new();
722 let mut vector_query = Vec::new();
723 for (i, index) in indices.iter().enumerate() {
724 ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
725 if let Some(update) = self.updates.get(index) {
726 let value = match update {
727 Update::Removed => false,
728 Update::Set(_) => true,
729 };
730 results.push(value);
731 } else {
732 results.push(false);
733 if !self.deletion_set.contains_prefix_of(index) {
734 missed_indices.push(i);
735 let key = self
736 .context
737 .base_key()
738 .base_tag_index(KeyTag::Index as u8, index);
739 vector_query.push(key);
740 }
741 }
742 }
743 let values = self.context.store().contains_keys(&vector_query).await?;
744 for (i, value) in missed_indices.into_iter().zip(values) {
745 results[i] = value;
746 }
747 Ok(results)
748 }
749
750 pub async fn multi_get(&self, indices: &[Vec<u8>]) -> Result<Vec<Option<Vec<u8>>>, ViewError> {
766 #[cfg(with_metrics)]
767 let _latency = metrics::KEY_VALUE_STORE_VIEW_MULTI_GET_LATENCY.measure_latency();
768 let mut result = Vec::with_capacity(indices.len());
769 let mut missed_indices = Vec::new();
770 let mut vector_query = Vec::new();
771 for (i, index) in indices.iter().enumerate() {
772 ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
773 if let Some(update) = self.updates.get(index) {
774 let value = match update {
775 Update::Removed => None,
776 Update::Set(value) => Some(value.clone()),
777 };
778 result.push(value);
779 } else {
780 result.push(None);
781 if !self.deletion_set.contains_prefix_of(index) {
782 missed_indices.push(i);
783 let key = self
784 .context
785 .base_key()
786 .base_tag_index(KeyTag::Index as u8, index);
787 vector_query.push(key);
788 }
789 }
790 }
791 let values = self
792 .context
793 .store()
794 .read_multi_values_bytes(&vector_query)
795 .await?;
796 for (i, value) in missed_indices.into_iter().zip(values) {
797 result[i] = value;
798 }
799 Ok(result)
800 }
801
802 pub async fn write_batch(&mut self, batch: Batch) -> Result<(), ViewError> {
821 #[cfg(with_metrics)]
822 let _latency = metrics::KEY_VALUE_STORE_VIEW_WRITE_BATCH_LATENCY.measure_latency();
823 *self.hash.get_mut().unwrap() = None;
824 let max_key_size = self.max_key_size();
825 for operation in batch.operations {
826 match operation {
827 WriteOperation::Delete { key } => {
828 ensure!(key.len() <= max_key_size, ViewError::KeyTooLong);
829 if self.deletion_set.contains_prefix_of(&key) {
830 self.updates.remove(&key);
832 } else {
833 self.updates.insert(key, Update::Removed);
834 }
835 }
836 WriteOperation::Put { key, value } => {
837 ensure!(key.len() <= max_key_size, ViewError::KeyTooLong);
838 self.updates.insert(key, Update::Set(value));
839 }
840 WriteOperation::DeletePrefix { key_prefix } => {
841 ensure!(key_prefix.len() <= max_key_size, ViewError::KeyTooLong);
842 let key_list = self
843 .updates
844 .range(get_key_range_for_prefix(key_prefix.clone()))
845 .map(|x| x.0.to_vec())
846 .collect::<Vec<_>>();
847 for key in key_list {
848 self.updates.remove(&key);
849 }
850 self.deletion_set.insert_key_prefix(key_prefix);
851 }
852 }
853 }
854 Ok(())
855 }
856
857 pub async fn insert(&mut self, index: Vec<u8>, value: Vec<u8>) -> Result<(), ViewError> {
870 let mut batch = Batch::new();
871 batch.put_key_value_bytes(index, value);
872 self.write_batch(batch).await
873 }
874
875 pub async fn remove(&mut self, index: Vec<u8>) -> Result<(), ViewError> {
889 let mut batch = Batch::new();
890 batch.delete_key(index);
891 self.write_batch(batch).await
892 }
893
894 pub async fn remove_by_prefix(&mut self, key_prefix: Vec<u8>) -> Result<(), ViewError> {
908 let mut batch = Batch::new();
909 batch.delete_key_prefix(key_prefix);
910 self.write_batch(batch).await
911 }
912
913 pub async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Vec<Vec<u8>>, ViewError> {
928 #[cfg(with_metrics)]
929 let _latency = metrics::KEY_VALUE_STORE_VIEW_FIND_KEYS_BY_PREFIX_LATENCY.measure_latency();
930 ensure!(
931 key_prefix.len() <= self.max_key_size(),
932 ViewError::KeyTooLong
933 );
934 let len = key_prefix.len();
935 let key_prefix_full = self
936 .context
937 .base_key()
938 .base_tag_index(KeyTag::Index as u8, key_prefix);
939 let mut keys = Vec::new();
940 let key_prefix_upper = get_upper_bound(key_prefix);
941 let mut updates = self
942 .updates
943 .range((Included(key_prefix.to_vec()), key_prefix_upper));
944 let mut update = updates.next();
945 if !self.deletion_set.delete_storage_first {
946 let mut suffix_closed_set =
947 SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
948 for key in self
949 .context
950 .store()
951 .find_keys_by_prefix(&key_prefix_full)
952 .await?
953 {
954 loop {
955 match update {
956 Some((update_key, update_value))
957 if &update_key[len..] <= key.as_slice() =>
958 {
959 if let Update::Set(_) = update_value {
960 keys.push(update_key[len..].to_vec());
961 }
962 update = updates.next();
963 if update_key[len..] == key[..] {
964 break;
965 }
966 }
967 _ => {
968 let mut key_with_prefix = key_prefix.to_vec();
969 key_with_prefix.extend_from_slice(&key);
970 if !suffix_closed_set.find_key(&key_with_prefix) {
971 keys.push(key);
972 }
973 break;
974 }
975 }
976 }
977 }
978 }
979 while let Some((update_key, update_value)) = update {
980 if let Update::Set(_) = update_value {
981 let update_key = update_key[len..].to_vec();
982 keys.push(update_key);
983 }
984 update = updates.next();
985 }
986 Ok(keys)
987 }
988
989 pub async fn find_key_values_by_prefix(
1005 &self,
1006 key_prefix: &[u8],
1007 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ViewError> {
1008 #[cfg(with_metrics)]
1009 let _latency =
1010 metrics::KEY_VALUE_STORE_VIEW_FIND_KEY_VALUES_BY_PREFIX_LATENCY.measure_latency();
1011 ensure!(
1012 key_prefix.len() <= self.max_key_size(),
1013 ViewError::KeyTooLong
1014 );
1015 let len = key_prefix.len();
1016 let key_prefix_full = self
1017 .context
1018 .base_key()
1019 .base_tag_index(KeyTag::Index as u8, key_prefix);
1020 let mut key_values = Vec::new();
1021 let key_prefix_upper = get_upper_bound(key_prefix);
1022 let mut updates = self
1023 .updates
1024 .range((Included(key_prefix.to_vec()), key_prefix_upper));
1025 let mut update = updates.next();
1026 if !self.deletion_set.delete_storage_first {
1027 let mut suffix_closed_set =
1028 SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
1029 for entry in self
1030 .context
1031 .store()
1032 .find_key_values_by_prefix(&key_prefix_full)
1033 .await?
1034 {
1035 let (key, value) = entry;
1036 loop {
1037 match update {
1038 Some((update_key, update_value)) if update_key[len..] <= key[..] => {
1039 if let Update::Set(update_value) = update_value {
1040 let key_value = (update_key[len..].to_vec(), update_value.to_vec());
1041 key_values.push(key_value);
1042 }
1043 update = updates.next();
1044 if update_key[len..] == key[..] {
1045 break;
1046 }
1047 }
1048 _ => {
1049 let mut key_with_prefix = key_prefix.to_vec();
1050 key_with_prefix.extend_from_slice(&key);
1051 if !suffix_closed_set.find_key(&key_with_prefix) {
1052 key_values.push((key, value));
1053 }
1054 break;
1055 }
1056 }
1057 }
1058 }
1059 }
1060 while let Some((update_key, update_value)) = update {
1061 if let Update::Set(update_value) = update_value {
1062 let key_value = (update_key[len..].to_vec(), update_value.to_vec());
1063 key_values.push(key_value);
1064 }
1065 update = updates.next();
1066 }
1067 Ok(key_values)
1068 }
1069
1070 async fn compute_hash(&self) -> Result<<sha3::Sha3_256 as Hasher>::Output, ViewError> {
1071 #[cfg(with_metrics)]
1072 let _hash_latency = metrics::KEY_VALUE_STORE_VIEW_HASH_LATENCY.measure_latency();
1073 let mut hasher = sha3::Sha3_256::default();
1074 let mut count = 0u32;
1075 self.for_each_index_value(|index, value| -> Result<(), ViewError> {
1076 count += 1;
1077 hasher.update_with_bytes(index)?;
1078 hasher.update_with_bytes(value)?;
1079 Ok(())
1080 })
1081 .await?;
1082 hasher.update_with_bcs_bytes(&count)?;
1083 Ok(hasher.finalize())
1084 }
1085}
1086
1087impl<C: Context> HashableView for KeyValueStoreView<C> {
1088 type Hasher = sha3::Sha3_256;
1089
1090 async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
1091 let hash = *self.hash.get_mut().unwrap();
1092 match hash {
1093 Some(hash) => Ok(hash),
1094 None => {
1095 let new_hash = self.compute_hash().await?;
1096 let hash = self.hash.get_mut().unwrap();
1097 *hash = Some(new_hash);
1098 Ok(new_hash)
1099 }
1100 }
1101 }
1102
1103 async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
1104 let hash = *self.hash.lock().unwrap();
1105 match hash {
1106 Some(hash) => Ok(hash),
1107 None => {
1108 let new_hash = self.compute_hash().await?;
1109 let mut hash = self.hash.lock().unwrap();
1110 *hash = Some(new_hash);
1111 Ok(new_hash)
1112 }
1113 }
1114 }
1115}
1116
1117pub type HashedKeyValueStoreView<C> =
1119 WrappedHashableContainerView<C, KeyValueStoreView<C>, HasherOutput>;
1120
1121pub type HistoricallyHashedKeyValueStoreView<C> = HistoricallyHashableView<C, KeyValueStoreView<C>>;
1123
1124#[cfg(with_testing)]
1126#[derive(Debug, Clone)]
1127pub struct ViewContainer<C> {
1128 view: Arc<RwLock<KeyValueStoreView<C>>>,
1129}
1130
1131#[cfg(with_testing)]
1132impl<C> WithError for ViewContainer<C> {
1133 type Error = ViewContainerError;
1134}
1135
1136#[cfg(with_testing)]
1137#[derive(Error, Debug)]
1139pub enum ViewContainerError {
1140 #[error(transparent)]
1142 ViewError(#[from] ViewError),
1143
1144 #[error(transparent)]
1146 BcsError(#[from] bcs::Error),
1147}
1148
1149#[cfg(with_testing)]
1150impl KeyValueStoreError for ViewContainerError {
1151 const BACKEND: &'static str = "view_container";
1152}
1153
1154#[cfg(with_testing)]
1155impl<C: Context> ReadableKeyValueStore for ViewContainer<C> {
1156 const MAX_KEY_SIZE: usize = <C::Store as ReadableKeyValueStore>::MAX_KEY_SIZE;
1157
1158 fn max_stream_queries(&self) -> usize {
1159 1
1160 }
1161
1162 fn root_key(&self) -> Result<Vec<u8>, ViewContainerError> {
1163 Ok(Vec::new())
1164 }
1165
1166 async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, ViewContainerError> {
1167 let view = self.view.read().await;
1168 Ok(view.get(key).await?)
1169 }
1170
1171 async fn contains_key(&self, key: &[u8]) -> Result<bool, ViewContainerError> {
1172 let view = self.view.read().await;
1173 Ok(view.contains_key(key).await?)
1174 }
1175
1176 async fn contains_keys(&self, keys: &[Vec<u8>]) -> Result<Vec<bool>, ViewContainerError> {
1177 let view = self.view.read().await;
1178 Ok(view.contains_keys(keys).await?)
1179 }
1180
1181 async fn read_multi_values_bytes(
1182 &self,
1183 keys: &[Vec<u8>],
1184 ) -> Result<Vec<Option<Vec<u8>>>, ViewContainerError> {
1185 let view = self.view.read().await;
1186 Ok(view.multi_get(keys).await?)
1187 }
1188
1189 async fn find_keys_by_prefix(
1190 &self,
1191 key_prefix: &[u8],
1192 ) -> Result<Vec<Vec<u8>>, ViewContainerError> {
1193 let view = self.view.read().await;
1194 Ok(view.find_keys_by_prefix(key_prefix).await?)
1195 }
1196
1197 async fn find_key_values_by_prefix(
1198 &self,
1199 key_prefix: &[u8],
1200 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ViewContainerError> {
1201 let view = self.view.read().await;
1202 Ok(view.find_key_values_by_prefix(key_prefix).await?)
1203 }
1204}
1205
1206#[cfg(with_testing)]
1207impl<C: Context> WritableKeyValueStore for ViewContainer<C> {
1208 const MAX_VALUE_SIZE: usize = <C::Store as WritableKeyValueStore>::MAX_VALUE_SIZE;
1209
1210 async fn write_batch(&self, batch: Batch) -> Result<(), ViewContainerError> {
1211 let mut view = self.view.write().await;
1212 view.write_batch(batch).await?;
1213 let mut batch = Batch::new();
1214 view.pre_save(&mut batch)?;
1215 view.post_save();
1216 view.context()
1217 .store()
1218 .write_batch(batch)
1219 .await
1220 .map_err(ViewError::from)?;
1221 Ok(())
1222 }
1223
1224 async fn clear_journal(&self) -> Result<(), ViewContainerError> {
1225 Ok(())
1226 }
1227}
1228
1229#[cfg(with_testing)]
1230impl<C: Context> ViewContainer<C> {
1231 pub async fn new(context: C) -> Result<Self, ViewError> {
1233 let view = KeyValueStoreView::load(context).await?;
1234 let view = Arc::new(RwLock::new(view));
1235 Ok(Self { view })
1236 }
1237}