1use std::{collections::BTreeMap, fmt::Debug, ops::Bound::Included, sync::Mutex};
17
18use allocative::Allocative;
19#[cfg(with_metrics)]
20use linera_base::prometheus_util::MeasureLatency as _;
21use linera_base::{data_types::ArithmeticError, ensure, visit_allocative_simple};
22use serde::{Deserialize, Serialize};
23
24use crate::{
25 batch::{Batch, WriteOperation},
26 common::{
27 from_bytes_option, from_bytes_option_or_default, get_key_range_for_prefix, get_upper_bound,
28 DeletionSet, HasherOutput, SuffixClosedSetIterator, Update,
29 },
30 context::Context,
31 hashable_wrapper::WrappedHashableContainerView,
32 historical_hash_wrapper::HistoricallyHashableView,
33 map_view::ByteMapView,
34 store::ReadableKeyValueStore,
35 views::{ClonableView, HashableView, Hasher, ReplaceContext, View, ViewError, MIN_VIEW_TAG},
36};
37
38#[cfg(with_metrics)]
39mod metrics {
40 use std::sync::LazyLock;
41
42 use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
43 use prometheus::HistogramVec;
44
45 pub static KEY_VALUE_STORE_VIEW_HASH_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
47 register_histogram_vec(
48 "key_value_store_view_hash_latency",
49 "KeyValueStoreView hash latency",
50 &[],
51 exponential_bucket_latencies(5.0),
52 )
53 });
54
55 pub static KEY_VALUE_STORE_VIEW_GET_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
57 register_histogram_vec(
58 "key_value_store_view_get_latency",
59 "KeyValueStoreView get latency",
60 &[],
61 exponential_bucket_latencies(5.0),
62 )
63 });
64
65 pub static KEY_VALUE_STORE_VIEW_MULTI_GET_LATENCY: LazyLock<HistogramVec> =
67 LazyLock::new(|| {
68 register_histogram_vec(
69 "key_value_store_view_multi_get_latency",
70 "KeyValueStoreView multi get latency",
71 &[],
72 exponential_bucket_latencies(5.0),
73 )
74 });
75
76 pub static KEY_VALUE_STORE_VIEW_CONTAINS_KEY_LATENCY: LazyLock<HistogramVec> =
78 LazyLock::new(|| {
79 register_histogram_vec(
80 "key_value_store_view_contains_key_latency",
81 "KeyValueStoreView contains key latency",
82 &[],
83 exponential_bucket_latencies(5.0),
84 )
85 });
86
87 pub static KEY_VALUE_STORE_VIEW_CONTAINS_KEYS_LATENCY: LazyLock<HistogramVec> =
89 LazyLock::new(|| {
90 register_histogram_vec(
91 "key_value_store_view_contains_keys_latency",
92 "KeyValueStoreView contains keys latency",
93 &[],
94 exponential_bucket_latencies(5.0),
95 )
96 });
97
98 pub static KEY_VALUE_STORE_VIEW_FIND_KEYS_BY_PREFIX_LATENCY: LazyLock<HistogramVec> =
100 LazyLock::new(|| {
101 register_histogram_vec(
102 "key_value_store_view_find_keys_by_prefix_latency",
103 "KeyValueStoreView find keys by prefix latency",
104 &[],
105 exponential_bucket_latencies(5.0),
106 )
107 });
108
109 pub static KEY_VALUE_STORE_VIEW_FIND_KEY_VALUES_BY_PREFIX_LATENCY: LazyLock<HistogramVec> =
111 LazyLock::new(|| {
112 register_histogram_vec(
113 "key_value_store_view_find_key_values_by_prefix_latency",
114 "KeyValueStoreView find key values by prefix latency",
115 &[],
116 exponential_bucket_latencies(5.0),
117 )
118 });
119
120 pub static KEY_VALUE_STORE_VIEW_WRITE_BATCH_LATENCY: LazyLock<HistogramVec> =
122 LazyLock::new(|| {
123 register_histogram_vec(
124 "key_value_store_view_write_batch_latency",
125 "KeyValueStoreView write batch latency",
126 &[],
127 exponential_bucket_latencies(5.0),
128 )
129 });
130}
131
132#[cfg(with_testing)]
133use {
134 crate::store::{KeyValueStoreError, WithError, WritableKeyValueStore},
135 async_lock::RwLock,
136 std::sync::Arc,
137 thiserror::Error,
138};
139
140#[repr(u8)]
141enum KeyTag {
142 Index = MIN_VIEW_TAG,
144 TotalSize,
146 Sizes,
148 Hash,
150}
151
152#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize, Allocative)]
154pub struct SizeData {
155 pub key: u32,
157 pub value: u32,
159}
160
161impl SizeData {
162 pub fn sum(&mut self) -> u32 {
164 self.key + self.value
165 }
166
167 pub fn add_assign(&mut self, size: SizeData) -> Result<(), ViewError> {
169 self.key = self
170 .key
171 .checked_add(size.key)
172 .ok_or(ViewError::ArithmeticError(ArithmeticError::Overflow))?;
173 self.value = self
174 .value
175 .checked_add(size.value)
176 .ok_or(ViewError::ArithmeticError(ArithmeticError::Overflow))?;
177 Ok(())
178 }
179
180 pub fn sub_assign(&mut self, size: SizeData) {
182 self.key -= size.key;
183 self.value -= size.value;
184 }
185}
186
187#[derive(Debug, Allocative)]
205#[allocative(bound = "C")]
206pub struct KeyValueStoreView<C> {
207 #[allocative(skip)]
209 context: C,
210 deletion_set: DeletionSet,
212 updates: BTreeMap<Vec<u8>, Update<Vec<u8>>>,
214 stored_total_size: SizeData,
216 total_size: SizeData,
218 sizes: ByteMapView<C, u32>,
220 #[allocative(visit = visit_allocative_simple)]
222 stored_hash: Option<HasherOutput>,
223 #[allocative(visit = visit_allocative_simple)]
225 hash: Mutex<Option<HasherOutput>>,
226}
227
228impl<C: Context, C2: Context> ReplaceContext<C2> for KeyValueStoreView<C> {
229 type Target = KeyValueStoreView<C2>;
230
231 async fn with_context(
232 &mut self,
233 ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
234 ) -> Self::Target {
235 let hash = *self.hash.lock().unwrap();
236 KeyValueStoreView {
237 context: ctx.clone()(self.context()),
238 deletion_set: self.deletion_set.clone(),
239 updates: self.updates.clone(),
240 stored_total_size: self.stored_total_size,
241 total_size: self.total_size,
242 sizes: self.sizes.with_context(ctx.clone()).await,
243 stored_hash: self.stored_hash,
244 hash: Mutex::new(hash),
245 }
246 }
247}
248
249impl<C: Context> View for KeyValueStoreView<C> {
250 const NUM_INIT_KEYS: usize = 2 + ByteMapView::<C, u32>::NUM_INIT_KEYS;
251
252 type Context = C;
253
254 fn context(&self) -> &C {
255 &self.context
256 }
257
258 fn pre_load(context: &C) -> Result<Vec<Vec<u8>>, ViewError> {
259 let key_hash = context.base_key().base_tag(KeyTag::Hash as u8);
260 let key_total_size = context.base_key().base_tag(KeyTag::TotalSize as u8);
261 let mut v = vec![key_hash, key_total_size];
262 let base_key = context.base_key().base_tag(KeyTag::Sizes as u8);
263 let context_sizes = context.clone_with_base_key(base_key);
264 v.extend(ByteMapView::<C, u32>::pre_load(&context_sizes)?);
265 Ok(v)
266 }
267
268 fn post_load(context: C, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
269 let hash = from_bytes_option(values.first().ok_or(ViewError::PostLoadValuesError)?)?;
270 let total_size =
271 from_bytes_option_or_default(values.get(1).ok_or(ViewError::PostLoadValuesError)?)?;
272 let base_key = context.base_key().base_tag(KeyTag::Sizes as u8);
273 let context_sizes = context.clone_with_base_key(base_key);
274 let sizes = ByteMapView::post_load(
275 context_sizes,
276 values.get(2..).ok_or(ViewError::PostLoadValuesError)?,
277 )?;
278 Ok(Self {
279 context,
280 deletion_set: DeletionSet::new(),
281 updates: BTreeMap::new(),
282 stored_total_size: total_size,
283 total_size,
284 sizes,
285 stored_hash: hash,
286 hash: Mutex::new(hash),
287 })
288 }
289
290 fn rollback(&mut self) {
291 self.deletion_set.rollback();
292 self.updates.clear();
293 self.total_size = self.stored_total_size;
294 self.sizes.rollback();
295 *self.hash.get_mut().unwrap() = self.stored_hash;
296 }
297
298 async fn has_pending_changes(&self) -> bool {
299 if self.deletion_set.has_pending_changes() {
300 return true;
301 }
302 if !self.updates.is_empty() {
303 return true;
304 }
305 if self.stored_total_size != self.total_size {
306 return true;
307 }
308 if self.sizes.has_pending_changes().await {
309 return true;
310 }
311 let hash = self.hash.lock().unwrap();
312 self.stored_hash != *hash
313 }
314
315 fn pre_save(&self, batch: &mut Batch) -> Result<bool, ViewError> {
316 let mut delete_view = false;
317 if self.deletion_set.delete_storage_first {
318 delete_view = true;
319 batch.delete_key_prefix(self.context.base_key().bytes.clone());
320 for (index, update) in self.updates.iter() {
321 if let Update::Set(value) = update {
322 let key = self
323 .context
324 .base_key()
325 .base_tag_index(KeyTag::Index as u8, index);
326 batch.put_key_value_bytes(key, value.clone());
327 delete_view = false;
328 }
329 }
330 } else {
331 for index in self.deletion_set.deleted_prefixes.iter() {
332 let key = self
333 .context
334 .base_key()
335 .base_tag_index(KeyTag::Index as u8, index);
336 batch.delete_key_prefix(key);
337 }
338 for (index, update) in self.updates.iter() {
339 let key = self
340 .context
341 .base_key()
342 .base_tag_index(KeyTag::Index as u8, index);
343 match update {
344 Update::Removed => batch.delete_key(key),
345 Update::Set(value) => batch.put_key_value_bytes(key, value.clone()),
346 }
347 }
348 }
349 self.sizes.pre_save(batch)?;
350 let hash = *self.hash.lock().unwrap();
351 if self.stored_hash != hash {
352 let key = self.context.base_key().base_tag(KeyTag::Hash as u8);
353 match hash {
354 None => batch.delete_key(key),
355 Some(hash) => batch.put_key_value(key, &hash)?,
356 }
357 }
358 if self.stored_total_size != self.total_size {
359 let key = self.context.base_key().base_tag(KeyTag::TotalSize as u8);
360 batch.put_key_value(key, &self.total_size)?;
361 }
362 Ok(delete_view)
363 }
364
365 fn post_save(&mut self) {
366 self.deletion_set.delete_storage_first = false;
367 self.deletion_set.deleted_prefixes.clear();
368 self.updates.clear();
369 self.sizes.post_save();
370 let hash = *self.hash.lock().unwrap();
371 self.stored_hash = hash;
372 self.stored_total_size = self.total_size;
373 }
374
375 fn clear(&mut self) {
376 self.deletion_set.clear();
377 self.updates.clear();
378 self.total_size = SizeData::default();
379 self.sizes.clear();
380 *self.hash.get_mut().unwrap() = None;
381 }
382}
383
384impl<C: Context> ClonableView for KeyValueStoreView<C> {
385 fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
386 Ok(KeyValueStoreView {
387 context: self.context.clone(),
388 deletion_set: self.deletion_set.clone(),
389 updates: self.updates.clone(),
390 stored_total_size: self.stored_total_size,
391 total_size: self.total_size,
392 sizes: self.sizes.clone_unchecked()?,
393 stored_hash: self.stored_hash,
394 hash: Mutex::new(*self.hash.get_mut().unwrap()),
395 })
396 }
397}
398
399impl<C: Context> KeyValueStoreView<C> {
400 fn max_key_size(&self) -> usize {
401 let prefix_len = self.context.base_key().bytes.len();
402 <C::Store as ReadableKeyValueStore>::MAX_KEY_SIZE - 1 - prefix_len
403 }
404
405 pub fn total_size(&self) -> SizeData {
418 self.total_size
419 }
420
421 pub async fn for_each_index_while<F>(&self, mut f: F) -> Result<(), ViewError>
444 where
445 F: FnMut(&[u8]) -> Result<bool, ViewError> + Send,
446 {
447 let key_prefix = self.context.base_key().base_tag(KeyTag::Index as u8);
448 let mut updates = self.updates.iter();
449 let mut update = updates.next();
450 if !self.deletion_set.delete_storage_first {
451 let mut suffix_closed_set =
452 SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
453 for index in self
454 .context
455 .store()
456 .find_keys_by_prefix(&key_prefix)
457 .await?
458 {
459 loop {
460 match update {
461 Some((key, value)) if key <= &index => {
462 if let Update::Set(_) = value {
463 if !f(key)? {
464 return Ok(());
465 }
466 }
467 update = updates.next();
468 if key == &index {
469 break;
470 }
471 }
472 _ => {
473 if !suffix_closed_set.find_key(&index) && !f(&index)? {
474 return Ok(());
475 }
476 break;
477 }
478 }
479 }
480 }
481 }
482 while let Some((key, value)) = update {
483 if let Update::Set(_) = value {
484 if !f(key)? {
485 return Ok(());
486 }
487 }
488 update = updates.next();
489 }
490 Ok(())
491 }
492
493 pub async fn for_each_index<F>(&self, mut f: F) -> Result<(), ViewError>
515 where
516 F: FnMut(&[u8]) -> Result<(), ViewError> + Send,
517 {
518 self.for_each_index_while(|key| {
519 f(key)?;
520 Ok(true)
521 })
522 .await
523 }
524
525 pub async fn for_each_index_value_while<F>(&self, mut f: F) -> Result<(), ViewError>
547 where
548 F: FnMut(&[u8], &[u8]) -> Result<bool, ViewError> + Send,
549 {
550 let key_prefix = self.context.base_key().base_tag(KeyTag::Index as u8);
551 let mut updates = self.updates.iter();
552 let mut update = updates.next();
553 if !self.deletion_set.delete_storage_first {
554 let mut suffix_closed_set =
555 SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
556 for entry in self
557 .context
558 .store()
559 .find_key_values_by_prefix(&key_prefix)
560 .await?
561 {
562 let (index, index_val) = entry;
563 loop {
564 match update {
565 Some((key, value)) if key <= &index => {
566 if let Update::Set(value) = value {
567 if !f(key, value)? {
568 return Ok(());
569 }
570 }
571 update = updates.next();
572 if key == &index {
573 break;
574 }
575 }
576 _ => {
577 if !suffix_closed_set.find_key(&index) && !f(&index, &index_val)? {
578 return Ok(());
579 }
580 break;
581 }
582 }
583 }
584 }
585 }
586 while let Some((key, value)) = update {
587 if let Update::Set(value) = value {
588 if !f(key, value)? {
589 return Ok(());
590 }
591 }
592 update = updates.next();
593 }
594 Ok(())
595 }
596
597 pub async fn for_each_index_value<F>(&self, mut f: F) -> Result<(), ViewError>
618 where
619 F: FnMut(&[u8], &[u8]) -> Result<(), ViewError> + Send,
620 {
621 self.for_each_index_value_while(|key, value| {
622 f(key, value)?;
623 Ok(true)
624 })
625 .await
626 }
627
628 pub async fn indices(&self) -> Result<Vec<Vec<u8>>, ViewError> {
643 let mut indices = Vec::new();
644 self.for_each_index(|index| {
645 indices.push(index.to_vec());
646 Ok(())
647 })
648 .await?;
649 Ok(indices)
650 }
651
652 pub async fn index_values(&self) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ViewError> {
667 let mut index_values = Vec::new();
668 self.for_each_index_value(|index, value| {
669 index_values.push((index.to_vec(), value.to_vec()));
670 Ok(())
671 })
672 .await?;
673 Ok(index_values)
674 }
675
676 pub async fn count(&self) -> Result<usize, ViewError> {
691 let mut count = 0;
692 self.for_each_index(|_index| {
693 count += 1;
694 Ok(())
695 })
696 .await?;
697 Ok(count)
698 }
699
700 pub async fn get(&self, index: &[u8]) -> Result<Option<Vec<u8>>, ViewError> {
714 #[cfg(with_metrics)]
715 let _latency = metrics::KEY_VALUE_STORE_VIEW_GET_LATENCY.measure_latency();
716 ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
717 if let Some(update) = self.updates.get(index) {
718 let value = match update {
719 Update::Removed => None,
720 Update::Set(value) => Some(value.clone()),
721 };
722 return Ok(value);
723 }
724 if self.deletion_set.contains_prefix_of(index) {
725 return Ok(None);
726 }
727 let key = self
728 .context
729 .base_key()
730 .base_tag_index(KeyTag::Index as u8, index);
731 Ok(self.context.store().read_value_bytes(&key).await?)
732 }
733
734 pub async fn contains_key(&self, index: &[u8]) -> Result<bool, ViewError> {
748 #[cfg(with_metrics)]
749 let _latency = metrics::KEY_VALUE_STORE_VIEW_CONTAINS_KEY_LATENCY.measure_latency();
750 ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
751 if let Some(update) = self.updates.get(index) {
752 let test = match update {
753 Update::Removed => false,
754 Update::Set(_value) => true,
755 };
756 return Ok(test);
757 }
758 if self.deletion_set.contains_prefix_of(index) {
759 return Ok(false);
760 }
761 let key = self
762 .context
763 .base_key()
764 .base_tag_index(KeyTag::Index as u8, index);
765 Ok(self.context.store().contains_key(&key).await?)
766 }
767
768 pub async fn contains_keys(&self, indices: &[Vec<u8>]) -> Result<Vec<bool>, ViewError> {
783 #[cfg(with_metrics)]
784 let _latency = metrics::KEY_VALUE_STORE_VIEW_CONTAINS_KEYS_LATENCY.measure_latency();
785 let mut results = Vec::with_capacity(indices.len());
786 let mut missed_indices = Vec::new();
787 let mut vector_query = Vec::new();
788 for (i, index) in indices.iter().enumerate() {
789 ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
790 if let Some(update) = self.updates.get(index) {
791 let value = match update {
792 Update::Removed => false,
793 Update::Set(_) => true,
794 };
795 results.push(value);
796 } else {
797 results.push(false);
798 if !self.deletion_set.contains_prefix_of(index) {
799 missed_indices.push(i);
800 let key = self
801 .context
802 .base_key()
803 .base_tag_index(KeyTag::Index as u8, index);
804 vector_query.push(key);
805 }
806 }
807 }
808 let values = self.context.store().contains_keys(&vector_query).await?;
809 for (i, value) in missed_indices.into_iter().zip(values) {
810 results[i] = value;
811 }
812 Ok(results)
813 }
814
815 pub async fn multi_get(&self, indices: &[Vec<u8>]) -> Result<Vec<Option<Vec<u8>>>, ViewError> {
831 #[cfg(with_metrics)]
832 let _latency = metrics::KEY_VALUE_STORE_VIEW_MULTI_GET_LATENCY.measure_latency();
833 let mut result = Vec::with_capacity(indices.len());
834 let mut missed_indices = Vec::new();
835 let mut vector_query = Vec::new();
836 for (i, index) in indices.iter().enumerate() {
837 ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
838 if let Some(update) = self.updates.get(index) {
839 let value = match update {
840 Update::Removed => None,
841 Update::Set(value) => Some(value.clone()),
842 };
843 result.push(value);
844 } else {
845 result.push(None);
846 if !self.deletion_set.contains_prefix_of(index) {
847 missed_indices.push(i);
848 let key = self
849 .context
850 .base_key()
851 .base_tag_index(KeyTag::Index as u8, index);
852 vector_query.push(key);
853 }
854 }
855 }
856 let values = self
857 .context
858 .store()
859 .read_multi_values_bytes(&vector_query)
860 .await?;
861 for (i, value) in missed_indices.into_iter().zip(values) {
862 result[i] = value;
863 }
864 Ok(result)
865 }
866
867 pub async fn write_batch(&mut self, batch: Batch) -> Result<(), ViewError> {
886 #[cfg(with_metrics)]
887 let _latency = metrics::KEY_VALUE_STORE_VIEW_WRITE_BATCH_LATENCY.measure_latency();
888 *self.hash.get_mut().unwrap() = None;
889 let max_key_size = self.max_key_size();
890 for operation in batch.operations {
891 match operation {
892 WriteOperation::Delete { key } => {
893 ensure!(key.len() <= max_key_size, ViewError::KeyTooLong);
894 if let Some(value) = self.sizes.get(&key).await? {
895 let entry_size = SizeData {
896 key: u32::try_from(key.len()).map_err(|_| ArithmeticError::Overflow)?,
897 value,
898 };
899 self.total_size.sub_assign(entry_size);
900 }
901 self.sizes.remove(key.clone());
902 if self.deletion_set.contains_prefix_of(&key) {
903 self.updates.remove(&key);
905 } else {
906 self.updates.insert(key, Update::Removed);
907 }
908 }
909 WriteOperation::Put { key, value } => {
910 ensure!(key.len() <= max_key_size, ViewError::KeyTooLong);
911 let entry_size = SizeData {
912 key: key.len() as u32,
913 value: value.len() as u32,
914 };
915 self.total_size.add_assign(entry_size)?;
916 if let Some(value) = self.sizes.get(&key).await? {
917 let entry_size = SizeData {
918 key: key.len() as u32,
919 value,
920 };
921 self.total_size.sub_assign(entry_size);
922 }
923 self.sizes.insert(key.clone(), entry_size.value);
924 self.updates.insert(key, Update::Set(value));
925 }
926 WriteOperation::DeletePrefix { key_prefix } => {
927 ensure!(key_prefix.len() <= max_key_size, ViewError::KeyTooLong);
928 let key_list = self
929 .updates
930 .range(get_key_range_for_prefix(key_prefix.clone()))
931 .map(|x| x.0.to_vec())
932 .collect::<Vec<_>>();
933 for key in key_list {
934 self.updates.remove(&key);
935 }
936 let key_values = self.sizes.key_values_by_prefix(key_prefix.clone()).await?;
937 for (key, value) in key_values {
938 let entry_size = SizeData {
939 key: key.len() as u32,
940 value,
941 };
942 self.total_size.sub_assign(entry_size);
943 self.sizes.remove(key);
944 }
945 self.sizes.remove_by_prefix(key_prefix.clone());
946 self.deletion_set.insert_key_prefix(key_prefix);
947 }
948 }
949 }
950 Ok(())
951 }
952
953 pub async fn insert(&mut self, index: Vec<u8>, value: Vec<u8>) -> Result<(), ViewError> {
966 let mut batch = Batch::new();
967 batch.put_key_value_bytes(index, value);
968 self.write_batch(batch).await
969 }
970
971 pub async fn remove(&mut self, index: Vec<u8>) -> Result<(), ViewError> {
985 let mut batch = Batch::new();
986 batch.delete_key(index);
987 self.write_batch(batch).await
988 }
989
990 pub async fn remove_by_prefix(&mut self, key_prefix: Vec<u8>) -> Result<(), ViewError> {
1004 let mut batch = Batch::new();
1005 batch.delete_key_prefix(key_prefix);
1006 self.write_batch(batch).await
1007 }
1008
1009 pub async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Vec<Vec<u8>>, ViewError> {
1024 #[cfg(with_metrics)]
1025 let _latency = metrics::KEY_VALUE_STORE_VIEW_FIND_KEYS_BY_PREFIX_LATENCY.measure_latency();
1026 ensure!(
1027 key_prefix.len() <= self.max_key_size(),
1028 ViewError::KeyTooLong
1029 );
1030 let len = key_prefix.len();
1031 let key_prefix_full = self
1032 .context
1033 .base_key()
1034 .base_tag_index(KeyTag::Index as u8, key_prefix);
1035 let mut keys = Vec::new();
1036 let key_prefix_upper = get_upper_bound(key_prefix);
1037 let mut updates = self
1038 .updates
1039 .range((Included(key_prefix.to_vec()), key_prefix_upper));
1040 let mut update = updates.next();
1041 if !self.deletion_set.delete_storage_first {
1042 let mut suffix_closed_set =
1043 SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
1044 for key in self
1045 .context
1046 .store()
1047 .find_keys_by_prefix(&key_prefix_full)
1048 .await?
1049 {
1050 loop {
1051 match update {
1052 Some((update_key, update_value))
1053 if &update_key[len..] <= key.as_slice() =>
1054 {
1055 if let Update::Set(_) = update_value {
1056 keys.push(update_key[len..].to_vec());
1057 }
1058 update = updates.next();
1059 if update_key[len..] == key[..] {
1060 break;
1061 }
1062 }
1063 _ => {
1064 let mut key_with_prefix = key_prefix.to_vec();
1065 key_with_prefix.extend_from_slice(&key);
1066 if !suffix_closed_set.find_key(&key_with_prefix) {
1067 keys.push(key);
1068 }
1069 break;
1070 }
1071 }
1072 }
1073 }
1074 }
1075 while let Some((update_key, update_value)) = update {
1076 if let Update::Set(_) = update_value {
1077 let update_key = update_key[len..].to_vec();
1078 keys.push(update_key);
1079 }
1080 update = updates.next();
1081 }
1082 Ok(keys)
1083 }
1084
1085 pub async fn find_key_values_by_prefix(
1101 &self,
1102 key_prefix: &[u8],
1103 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ViewError> {
1104 #[cfg(with_metrics)]
1105 let _latency =
1106 metrics::KEY_VALUE_STORE_VIEW_FIND_KEY_VALUES_BY_PREFIX_LATENCY.measure_latency();
1107 ensure!(
1108 key_prefix.len() <= self.max_key_size(),
1109 ViewError::KeyTooLong
1110 );
1111 let len = key_prefix.len();
1112 let key_prefix_full = self
1113 .context
1114 .base_key()
1115 .base_tag_index(KeyTag::Index as u8, key_prefix);
1116 let mut key_values = Vec::new();
1117 let key_prefix_upper = get_upper_bound(key_prefix);
1118 let mut updates = self
1119 .updates
1120 .range((Included(key_prefix.to_vec()), key_prefix_upper));
1121 let mut update = updates.next();
1122 if !self.deletion_set.delete_storage_first {
1123 let mut suffix_closed_set =
1124 SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
1125 for entry in self
1126 .context
1127 .store()
1128 .find_key_values_by_prefix(&key_prefix_full)
1129 .await?
1130 {
1131 let (key, value) = entry;
1132 loop {
1133 match update {
1134 Some((update_key, update_value)) if update_key[len..] <= key[..] => {
1135 if let Update::Set(update_value) = update_value {
1136 let key_value = (update_key[len..].to_vec(), update_value.to_vec());
1137 key_values.push(key_value);
1138 }
1139 update = updates.next();
1140 if update_key[len..] == key[..] {
1141 break;
1142 }
1143 }
1144 _ => {
1145 let mut key_with_prefix = key_prefix.to_vec();
1146 key_with_prefix.extend_from_slice(&key);
1147 if !suffix_closed_set.find_key(&key_with_prefix) {
1148 key_values.push((key, value));
1149 }
1150 break;
1151 }
1152 }
1153 }
1154 }
1155 }
1156 while let Some((update_key, update_value)) = update {
1157 if let Update::Set(update_value) = update_value {
1158 let key_value = (update_key[len..].to_vec(), update_value.to_vec());
1159 key_values.push(key_value);
1160 }
1161 update = updates.next();
1162 }
1163 Ok(key_values)
1164 }
1165
1166 async fn compute_hash(&self) -> Result<<sha3::Sha3_256 as Hasher>::Output, ViewError> {
1167 #[cfg(with_metrics)]
1168 let _hash_latency = metrics::KEY_VALUE_STORE_VIEW_HASH_LATENCY.measure_latency();
1169 let mut hasher = sha3::Sha3_256::default();
1170 let mut count = 0u32;
1171 self.for_each_index_value(|index, value| -> Result<(), ViewError> {
1172 count += 1;
1173 hasher.update_with_bytes(index)?;
1174 hasher.update_with_bytes(value)?;
1175 Ok(())
1176 })
1177 .await?;
1178 hasher.update_with_bcs_bytes(&count)?;
1179 Ok(hasher.finalize())
1180 }
1181}
1182
1183impl<C: Context> HashableView for KeyValueStoreView<C> {
1184 type Hasher = sha3::Sha3_256;
1185
1186 async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
1187 let hash = *self.hash.get_mut().unwrap();
1188 match hash {
1189 Some(hash) => Ok(hash),
1190 None => {
1191 let new_hash = self.compute_hash().await?;
1192 let hash = self.hash.get_mut().unwrap();
1193 *hash = Some(new_hash);
1194 Ok(new_hash)
1195 }
1196 }
1197 }
1198
1199 async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
1200 let hash = *self.hash.lock().unwrap();
1201 match hash {
1202 Some(hash) => Ok(hash),
1203 None => {
1204 let new_hash = self.compute_hash().await?;
1205 let mut hash = self.hash.lock().unwrap();
1206 *hash = Some(new_hash);
1207 Ok(new_hash)
1208 }
1209 }
1210 }
1211}
1212
1213pub type HashedKeyValueStoreView<C> =
1215 WrappedHashableContainerView<C, KeyValueStoreView<C>, HasherOutput>;
1216
1217pub type HistoricallyHashedKeyValueStoreView<C> = HistoricallyHashableView<C, KeyValueStoreView<C>>;
1219
1220#[cfg(with_testing)]
1222#[derive(Debug, Clone)]
1223pub struct ViewContainer<C> {
1224 view: Arc<RwLock<KeyValueStoreView<C>>>,
1225}
1226
1227#[cfg(with_testing)]
1228impl<C> WithError for ViewContainer<C> {
1229 type Error = ViewContainerError;
1230}
1231
1232#[cfg(with_testing)]
1233#[derive(Error, Debug)]
1235pub enum ViewContainerError {
1236 #[error(transparent)]
1238 ViewError(#[from] ViewError),
1239
1240 #[error(transparent)]
1242 BcsError(#[from] bcs::Error),
1243}
1244
1245#[cfg(with_testing)]
1246impl KeyValueStoreError for ViewContainerError {
1247 const BACKEND: &'static str = "view_container";
1248}
1249
1250#[cfg(with_testing)]
1251impl<C: Context> ReadableKeyValueStore for ViewContainer<C> {
1252 const MAX_KEY_SIZE: usize = <C::Store as ReadableKeyValueStore>::MAX_KEY_SIZE;
1253
1254 fn max_stream_queries(&self) -> usize {
1255 1
1256 }
1257
1258 fn root_key(&self) -> Result<Vec<u8>, ViewContainerError> {
1259 Ok(Vec::new())
1260 }
1261
1262 async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, ViewContainerError> {
1263 let view = self.view.read().await;
1264 Ok(view.get(key).await?)
1265 }
1266
1267 async fn contains_key(&self, key: &[u8]) -> Result<bool, ViewContainerError> {
1268 let view = self.view.read().await;
1269 Ok(view.contains_key(key).await?)
1270 }
1271
1272 async fn contains_keys(&self, keys: &[Vec<u8>]) -> Result<Vec<bool>, ViewContainerError> {
1273 let view = self.view.read().await;
1274 Ok(view.contains_keys(keys).await?)
1275 }
1276
1277 async fn read_multi_values_bytes(
1278 &self,
1279 keys: &[Vec<u8>],
1280 ) -> Result<Vec<Option<Vec<u8>>>, ViewContainerError> {
1281 let view = self.view.read().await;
1282 Ok(view.multi_get(keys).await?)
1283 }
1284
1285 async fn find_keys_by_prefix(
1286 &self,
1287 key_prefix: &[u8],
1288 ) -> Result<Vec<Vec<u8>>, ViewContainerError> {
1289 let view = self.view.read().await;
1290 Ok(view.find_keys_by_prefix(key_prefix).await?)
1291 }
1292
1293 async fn find_key_values_by_prefix(
1294 &self,
1295 key_prefix: &[u8],
1296 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ViewContainerError> {
1297 let view = self.view.read().await;
1298 Ok(view.find_key_values_by_prefix(key_prefix).await?)
1299 }
1300}
1301
1302#[cfg(with_testing)]
1303impl<C: Context> WritableKeyValueStore for ViewContainer<C> {
1304 const MAX_VALUE_SIZE: usize = <C::Store as WritableKeyValueStore>::MAX_VALUE_SIZE;
1305
1306 async fn write_batch(&self, batch: Batch) -> Result<(), ViewContainerError> {
1307 let mut view = self.view.write().await;
1308 view.write_batch(batch).await?;
1309 let mut batch = Batch::new();
1310 view.pre_save(&mut batch)?;
1311 view.post_save();
1312 view.context()
1313 .store()
1314 .write_batch(batch)
1315 .await
1316 .map_err(ViewError::from)?;
1317 Ok(())
1318 }
1319
1320 async fn clear_journal(&self) -> Result<(), ViewContainerError> {
1321 Ok(())
1322 }
1323}
1324
1325#[cfg(with_testing)]
1326impl<C: Context> ViewContainer<C> {
1327 pub async fn new(context: C) -> Result<Self, ViewError> {
1329 let view = KeyValueStoreView::load(context).await?;
1330 let view = Arc::new(RwLock::new(view));
1331 Ok(Self { view })
1332 }
1333}