Skip to main content

linera_views/views/
key_value_store_view.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! We implement two types:
5//! 1) The first type `KeyValueStoreView` implements View and the function of `KeyValueStore`.
6//!
7//! 2) The second type `ViewContainer` encapsulates `KeyValueStoreView` and provides the following functionalities:
8//!    * The `Clone` trait
9//!    * a `write_batch` that takes a `&self` instead of a `&mut self`
10//!    * a `write_batch` that writes in the context instead of writing of the view.
11//!
12//! Currently, that second type is only used for tests.
13//!
14//! Key tags to create the sub-keys of a `KeyValueStoreView` on top of the base key.
15
16use std::{collections::BTreeMap, fmt::Debug, ops::Bound::Included, sync::Mutex};
17
18use allocative::Allocative;
19#[cfg(with_metrics)]
20use linera_base::prometheus_util::MeasureLatency as _;
21use linera_base::{data_types::ArithmeticError, ensure, visit_allocative_simple};
22use serde::{Deserialize, Serialize};
23
24use crate::{
25    batch::{Batch, WriteOperation},
26    common::{
27        from_bytes_option, from_bytes_option_or_default, get_key_range_for_prefix, get_upper_bound,
28        DeletionSet, HasherOutput, SuffixClosedSetIterator, Update,
29    },
30    context::Context,
31    hashable_wrapper::WrappedHashableContainerView,
32    historical_hash_wrapper::HistoricallyHashableView,
33    map_view::ByteMapView,
34    store::ReadableKeyValueStore,
35    views::{ClonableView, HashableView, Hasher, ReplaceContext, View, ViewError, MIN_VIEW_TAG},
36};
37
38#[cfg(with_metrics)]
39mod metrics {
40    use std::sync::LazyLock;
41
42    use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
43    use prometheus::HistogramVec;
44
45    /// The latency of hash computation
46    pub static KEY_VALUE_STORE_VIEW_HASH_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
47        register_histogram_vec(
48            "key_value_store_view_hash_latency",
49            "KeyValueStoreView hash latency",
50            &[],
51            exponential_bucket_latencies(5.0),
52        )
53    });
54
55    /// The latency of get operation
56    pub static KEY_VALUE_STORE_VIEW_GET_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
57        register_histogram_vec(
58            "key_value_store_view_get_latency",
59            "KeyValueStoreView get latency",
60            &[],
61            exponential_bucket_latencies(5.0),
62        )
63    });
64
65    /// The latency of multi get
66    pub static KEY_VALUE_STORE_VIEW_MULTI_GET_LATENCY: LazyLock<HistogramVec> =
67        LazyLock::new(|| {
68            register_histogram_vec(
69                "key_value_store_view_multi_get_latency",
70                "KeyValueStoreView multi get latency",
71                &[],
72                exponential_bucket_latencies(5.0),
73            )
74        });
75
76    /// The latency of contains key
77    pub static KEY_VALUE_STORE_VIEW_CONTAINS_KEY_LATENCY: LazyLock<HistogramVec> =
78        LazyLock::new(|| {
79            register_histogram_vec(
80                "key_value_store_view_contains_key_latency",
81                "KeyValueStoreView contains key latency",
82                &[],
83                exponential_bucket_latencies(5.0),
84            )
85        });
86
87    /// The latency of contains keys
88    pub static KEY_VALUE_STORE_VIEW_CONTAINS_KEYS_LATENCY: LazyLock<HistogramVec> =
89        LazyLock::new(|| {
90            register_histogram_vec(
91                "key_value_store_view_contains_keys_latency",
92                "KeyValueStoreView contains keys latency",
93                &[],
94                exponential_bucket_latencies(5.0),
95            )
96        });
97
98    /// The latency of find keys by prefix operation
99    pub static KEY_VALUE_STORE_VIEW_FIND_KEYS_BY_PREFIX_LATENCY: LazyLock<HistogramVec> =
100        LazyLock::new(|| {
101            register_histogram_vec(
102                "key_value_store_view_find_keys_by_prefix_latency",
103                "KeyValueStoreView find keys by prefix latency",
104                &[],
105                exponential_bucket_latencies(5.0),
106            )
107        });
108
109    /// The latency of find key values by prefix operation
110    pub static KEY_VALUE_STORE_VIEW_FIND_KEY_VALUES_BY_PREFIX_LATENCY: LazyLock<HistogramVec> =
111        LazyLock::new(|| {
112            register_histogram_vec(
113                "key_value_store_view_find_key_values_by_prefix_latency",
114                "KeyValueStoreView find key values by prefix latency",
115                &[],
116                exponential_bucket_latencies(5.0),
117            )
118        });
119
120    /// The latency of write batch operation
121    pub static KEY_VALUE_STORE_VIEW_WRITE_BATCH_LATENCY: LazyLock<HistogramVec> =
122        LazyLock::new(|| {
123            register_histogram_vec(
124                "key_value_store_view_write_batch_latency",
125                "KeyValueStoreView write batch latency",
126                &[],
127                exponential_bucket_latencies(5.0),
128            )
129        });
130}
131
132#[cfg(with_testing)]
133use {
134    crate::store::{KeyValueStoreError, WithError, WritableKeyValueStore},
135    async_lock::RwLock,
136    std::sync::Arc,
137    thiserror::Error,
138};
139
140#[repr(u8)]
141enum KeyTag {
142    /// Prefix for the indices of the view.
143    Index = MIN_VIEW_TAG,
144    /// The total stored size
145    TotalSize,
146    /// The prefix where the sizes are being stored
147    Sizes,
148    /// Prefix for the hash.
149    Hash,
150}
151
152/// A pair containing the key and value size.
153#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize, Allocative)]
154pub struct SizeData {
155    /// The size of the key
156    pub key: u32,
157    /// The size of the value
158    pub value: u32,
159}
160
161impl SizeData {
162    /// Sums both terms
163    pub fn sum(&mut self) -> u32 {
164        self.key + self.value
165    }
166
167    /// Adds a size to `self`
168    pub fn add_assign(&mut self, size: SizeData) -> Result<(), ViewError> {
169        self.key = self
170            .key
171            .checked_add(size.key)
172            .ok_or(ViewError::ArithmeticError(ArithmeticError::Overflow))?;
173        self.value = self
174            .value
175            .checked_add(size.value)
176            .ok_or(ViewError::ArithmeticError(ArithmeticError::Overflow))?;
177        Ok(())
178    }
179
180    /// Subtracts a size from `self`
181    pub fn sub_assign(&mut self, size: SizeData) {
182        self.key -= size.key;
183        self.value -= size.value;
184    }
185}
186
187/// A view that represents the functions of `KeyValueStore`.
188///
189/// Comment on the data set:
190/// In order to work, the view needs to store the updates and deleted prefixes.
191/// The updates and deleted prefixes have to be coherent. This means:
192/// * If an index is deleted by one in deleted prefixes then it should not be present
193///   in the updates at all.
194/// * [`DeletePrefix::key_prefix`][entry1] should not dominate anyone. That is if we have `[0,2]`
195///   then we should not have `[0,2,3]` since it would be dominated by the preceding.
196///
197/// With that we have:
198/// * in order to test if an `index` is deleted by a prefix we compute the highest deleted prefix `dp`
199///   such that `dp <= index`.
200///   If `dp` is indeed a prefix then we conclude that `index` is deleted, otherwise not.
201///   The no domination is essential here.
202///
203/// [entry1]: crate::batch::WriteOperation::DeletePrefix
204#[derive(Debug, Allocative)]
205#[allocative(bound = "C")]
206pub struct KeyValueStoreView<C> {
207    /// The view context.
208    #[allocative(skip)]
209    context: C,
210    /// Tracks deleted key prefixes.
211    deletion_set: DeletionSet,
212    /// Pending changes not yet persisted to storage.
213    updates: BTreeMap<Vec<u8>, Update<Vec<u8>>>,
214    /// The total size of keys and values persisted in storage.
215    stored_total_size: SizeData,
216    /// The total size of keys and values including pending changes.
217    total_size: SizeData,
218    /// Map of key to value size for tracking storage usage.
219    sizes: ByteMapView<C, u32>,
220    /// The hash persisted in storage.
221    #[allocative(visit = visit_allocative_simple)]
222    stored_hash: Option<HasherOutput>,
223    /// Memoized hash, if any.
224    #[allocative(visit = visit_allocative_simple)]
225    hash: Mutex<Option<HasherOutput>>,
226}
227
228impl<C: Context, C2: Context> ReplaceContext<C2> for KeyValueStoreView<C> {
229    type Target = KeyValueStoreView<C2>;
230
231    async fn with_context(
232        &mut self,
233        ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
234    ) -> Self::Target {
235        let hash = *self.hash.lock().unwrap();
236        KeyValueStoreView {
237            context: ctx.clone()(self.context()),
238            deletion_set: self.deletion_set.clone(),
239            updates: self.updates.clone(),
240            stored_total_size: self.stored_total_size,
241            total_size: self.total_size,
242            sizes: self.sizes.with_context(ctx.clone()).await,
243            stored_hash: self.stored_hash,
244            hash: Mutex::new(hash),
245        }
246    }
247}
248
249impl<C: Context> View for KeyValueStoreView<C> {
250    const NUM_INIT_KEYS: usize = 2 + ByteMapView::<C, u32>::NUM_INIT_KEYS;
251
252    type Context = C;
253
254    fn context(&self) -> &C {
255        &self.context
256    }
257
258    fn pre_load(context: &C) -> Result<Vec<Vec<u8>>, ViewError> {
259        let key_hash = context.base_key().base_tag(KeyTag::Hash as u8);
260        let key_total_size = context.base_key().base_tag(KeyTag::TotalSize as u8);
261        let mut v = vec![key_hash, key_total_size];
262        let base_key = context.base_key().base_tag(KeyTag::Sizes as u8);
263        let context_sizes = context.clone_with_base_key(base_key);
264        v.extend(ByteMapView::<C, u32>::pre_load(&context_sizes)?);
265        Ok(v)
266    }
267
268    fn post_load(context: C, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
269        let hash = from_bytes_option(values.first().ok_or(ViewError::PostLoadValuesError)?)?;
270        let total_size =
271            from_bytes_option_or_default(values.get(1).ok_or(ViewError::PostLoadValuesError)?)?;
272        let base_key = context.base_key().base_tag(KeyTag::Sizes as u8);
273        let context_sizes = context.clone_with_base_key(base_key);
274        let sizes = ByteMapView::post_load(
275            context_sizes,
276            values.get(2..).ok_or(ViewError::PostLoadValuesError)?,
277        )?;
278        Ok(Self {
279            context,
280            deletion_set: DeletionSet::new(),
281            updates: BTreeMap::new(),
282            stored_total_size: total_size,
283            total_size,
284            sizes,
285            stored_hash: hash,
286            hash: Mutex::new(hash),
287        })
288    }
289
290    fn rollback(&mut self) {
291        self.deletion_set.rollback();
292        self.updates.clear();
293        self.total_size = self.stored_total_size;
294        self.sizes.rollback();
295        *self.hash.get_mut().unwrap() = self.stored_hash;
296    }
297
298    async fn has_pending_changes(&self) -> bool {
299        if self.deletion_set.has_pending_changes() {
300            return true;
301        }
302        if !self.updates.is_empty() {
303            return true;
304        }
305        if self.stored_total_size != self.total_size {
306            return true;
307        }
308        if self.sizes.has_pending_changes().await {
309            return true;
310        }
311        let hash = self.hash.lock().unwrap();
312        self.stored_hash != *hash
313    }
314
315    fn pre_save(&self, batch: &mut Batch) -> Result<bool, ViewError> {
316        let mut delete_view = false;
317        if self.deletion_set.delete_storage_first {
318            delete_view = true;
319            batch.delete_key_prefix(self.context.base_key().bytes.clone());
320            for (index, update) in self.updates.iter() {
321                if let Update::Set(value) = update {
322                    let key = self
323                        .context
324                        .base_key()
325                        .base_tag_index(KeyTag::Index as u8, index);
326                    batch.put_key_value_bytes(key, value.clone());
327                    delete_view = false;
328                }
329            }
330        } else {
331            for index in self.deletion_set.deleted_prefixes.iter() {
332                let key = self
333                    .context
334                    .base_key()
335                    .base_tag_index(KeyTag::Index as u8, index);
336                batch.delete_key_prefix(key);
337            }
338            for (index, update) in self.updates.iter() {
339                let key = self
340                    .context
341                    .base_key()
342                    .base_tag_index(KeyTag::Index as u8, index);
343                match update {
344                    Update::Removed => batch.delete_key(key),
345                    Update::Set(value) => batch.put_key_value_bytes(key, value.clone()),
346                }
347            }
348        }
349        self.sizes.pre_save(batch)?;
350        let hash = *self.hash.lock().unwrap();
351        if self.stored_hash != hash {
352            let key = self.context.base_key().base_tag(KeyTag::Hash as u8);
353            match hash {
354                None => batch.delete_key(key),
355                Some(hash) => batch.put_key_value(key, &hash)?,
356            }
357        }
358        if self.stored_total_size != self.total_size {
359            let key = self.context.base_key().base_tag(KeyTag::TotalSize as u8);
360            batch.put_key_value(key, &self.total_size)?;
361        }
362        Ok(delete_view)
363    }
364
365    fn post_save(&mut self) {
366        self.deletion_set.delete_storage_first = false;
367        self.deletion_set.deleted_prefixes.clear();
368        self.updates.clear();
369        self.sizes.post_save();
370        let hash = *self.hash.lock().unwrap();
371        self.stored_hash = hash;
372        self.stored_total_size = self.total_size;
373    }
374
375    fn clear(&mut self) {
376        self.deletion_set.clear();
377        self.updates.clear();
378        self.total_size = SizeData::default();
379        self.sizes.clear();
380        *self.hash.get_mut().unwrap() = None;
381    }
382}
383
384impl<C: Context> ClonableView for KeyValueStoreView<C> {
385    fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
386        Ok(KeyValueStoreView {
387            context: self.context.clone(),
388            deletion_set: self.deletion_set.clone(),
389            updates: self.updates.clone(),
390            stored_total_size: self.stored_total_size,
391            total_size: self.total_size,
392            sizes: self.sizes.clone_unchecked()?,
393            stored_hash: self.stored_hash,
394            hash: Mutex::new(*self.hash.get_mut().unwrap()),
395        })
396    }
397}
398
399impl<C: Context> KeyValueStoreView<C> {
400    fn max_key_size(&self) -> usize {
401        let prefix_len = self.context.base_key().bytes.len();
402        <C::Store as ReadableKeyValueStore>::MAX_KEY_SIZE - 1 - prefix_len
403    }
404
405    /// Getting the total sizes that will be used for keys and values when stored
406    /// ```rust
407    /// # tokio_test::block_on(async {
408    /// # use linera_views::context::MemoryContext;
409    /// # use linera_views::key_value_store_view::{KeyValueStoreView, SizeData};
410    /// # use linera_views::views::View;
411    /// # let context = MemoryContext::new_for_testing(());
412    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
413    /// let total_size = view.total_size();
414    /// assert_eq!(total_size, SizeData::default());
415    /// # })
416    /// ```
417    pub fn total_size(&self) -> SizeData {
418        self.total_size
419    }
420
421    /// Applies the function f over all indices. If the function f returns
422    /// false, then the loop ends prematurely.
423    /// ```rust
424    /// # tokio_test::block_on(async {
425    /// # use linera_views::context::MemoryContext;
426    /// # use linera_views::key_value_store_view::KeyValueStoreView;
427    /// # use linera_views::views::View;
428    /// # let context = MemoryContext::new_for_testing(());
429    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
430    /// view.insert(vec![0, 1], vec![0]).await.unwrap();
431    /// view.insert(vec![0, 2], vec![0]).await.unwrap();
432    /// view.insert(vec![0, 3], vec![0]).await.unwrap();
433    /// let mut count = 0;
434    /// view.for_each_index_while(|_key| {
435    ///     count += 1;
436    ///     Ok(count < 2)
437    /// })
438    /// .await
439    /// .unwrap();
440    /// assert_eq!(count, 2);
441    /// # })
442    /// ```
443    pub async fn for_each_index_while<F>(&self, mut f: F) -> Result<(), ViewError>
444    where
445        F: FnMut(&[u8]) -> Result<bool, ViewError> + Send,
446    {
447        let key_prefix = self.context.base_key().base_tag(KeyTag::Index as u8);
448        let mut updates = self.updates.iter();
449        let mut update = updates.next();
450        if !self.deletion_set.delete_storage_first {
451            let mut suffix_closed_set =
452                SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
453            for index in self
454                .context
455                .store()
456                .find_keys_by_prefix(&key_prefix)
457                .await?
458            {
459                loop {
460                    match update {
461                        Some((key, value)) if key <= &index => {
462                            if let Update::Set(_) = value {
463                                if !f(key)? {
464                                    return Ok(());
465                                }
466                            }
467                            update = updates.next();
468                            if key == &index {
469                                break;
470                            }
471                        }
472                        _ => {
473                            if !suffix_closed_set.find_key(&index) && !f(&index)? {
474                                return Ok(());
475                            }
476                            break;
477                        }
478                    }
479                }
480            }
481        }
482        while let Some((key, value)) = update {
483            if let Update::Set(_) = value {
484                if !f(key)? {
485                    return Ok(());
486                }
487            }
488            update = updates.next();
489        }
490        Ok(())
491    }
492
493    /// Applies the function f over all indices.
494    /// ```rust
495    /// # tokio_test::block_on(async {
496    /// # use linera_views::context::MemoryContext;
497    /// # use linera_views::key_value_store_view::KeyValueStoreView;
498    /// # use linera_views::views::View;
499    /// # let context = MemoryContext::new_for_testing(());
500    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
501    /// view.insert(vec![0, 1], vec![0]).await.unwrap();
502    /// view.insert(vec![0, 2], vec![0]).await.unwrap();
503    /// view.insert(vec![0, 3], vec![0]).await.unwrap();
504    /// let mut count = 0;
505    /// view.for_each_index(|_key| {
506    ///     count += 1;
507    ///     Ok(())
508    /// })
509    /// .await
510    /// .unwrap();
511    /// assert_eq!(count, 3);
512    /// # })
513    /// ```
514    pub async fn for_each_index<F>(&self, mut f: F) -> Result<(), ViewError>
515    where
516        F: FnMut(&[u8]) -> Result<(), ViewError> + Send,
517    {
518        self.for_each_index_while(|key| {
519            f(key)?;
520            Ok(true)
521        })
522        .await
523    }
524
525    /// Applies the function f over all index/value pairs.
526    /// If the function f returns false then the loop ends prematurely.
527    /// ```rust
528    /// # tokio_test::block_on(async {
529    /// # use linera_views::context::MemoryContext;
530    /// # use linera_views::key_value_store_view::KeyValueStoreView;
531    /// # use linera_views::views::View;
532    /// # let context = MemoryContext::new_for_testing(());
533    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
534    /// view.insert(vec![0, 1], vec![0]).await.unwrap();
535    /// view.insert(vec![0, 2], vec![0]).await.unwrap();
536    /// let mut values = Vec::new();
537    /// view.for_each_index_value_while(|_key, value| {
538    ///     values.push(value.to_vec());
539    ///     Ok(values.len() < 1)
540    /// })
541    /// .await
542    /// .unwrap();
543    /// assert_eq!(values, vec![vec![0]]);
544    /// # })
545    /// ```
546    pub async fn for_each_index_value_while<F>(&self, mut f: F) -> Result<(), ViewError>
547    where
548        F: FnMut(&[u8], &[u8]) -> Result<bool, ViewError> + Send,
549    {
550        let key_prefix = self.context.base_key().base_tag(KeyTag::Index as u8);
551        let mut updates = self.updates.iter();
552        let mut update = updates.next();
553        if !self.deletion_set.delete_storage_first {
554            let mut suffix_closed_set =
555                SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
556            for entry in self
557                .context
558                .store()
559                .find_key_values_by_prefix(&key_prefix)
560                .await?
561            {
562                let (index, index_val) = entry;
563                loop {
564                    match update {
565                        Some((key, value)) if key <= &index => {
566                            if let Update::Set(value) = value {
567                                if !f(key, value)? {
568                                    return Ok(());
569                                }
570                            }
571                            update = updates.next();
572                            if key == &index {
573                                break;
574                            }
575                        }
576                        _ => {
577                            if !suffix_closed_set.find_key(&index) && !f(&index, &index_val)? {
578                                return Ok(());
579                            }
580                            break;
581                        }
582                    }
583                }
584            }
585        }
586        while let Some((key, value)) = update {
587            if let Update::Set(value) = value {
588                if !f(key, value)? {
589                    return Ok(());
590                }
591            }
592            update = updates.next();
593        }
594        Ok(())
595    }
596
597    /// Applies the function f over all index/value pairs.
598    /// ```rust
599    /// # tokio_test::block_on(async {
600    /// # use linera_views::context::MemoryContext;
601    /// # use linera_views::key_value_store_view::KeyValueStoreView;
602    /// # use linera_views::views::View;
603    /// # let context = MemoryContext::new_for_testing(());
604    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
605    /// view.insert(vec![0, 1], vec![0]).await.unwrap();
606    /// view.insert(vec![0, 2], vec![0]).await.unwrap();
607    /// let mut part_keys = Vec::new();
608    /// view.for_each_index_while(|key| {
609    ///     part_keys.push(key.to_vec());
610    ///     Ok(part_keys.len() < 1)
611    /// })
612    /// .await
613    /// .unwrap();
614    /// assert_eq!(part_keys, vec![vec![0, 1]]);
615    /// # })
616    /// ```
617    pub async fn for_each_index_value<F>(&self, mut f: F) -> Result<(), ViewError>
618    where
619        F: FnMut(&[u8], &[u8]) -> Result<(), ViewError> + Send,
620    {
621        self.for_each_index_value_while(|key, value| {
622            f(key, value)?;
623            Ok(true)
624        })
625        .await
626    }
627
628    /// Returns the list of indices in lexicographic order.
629    /// ```rust
630    /// # tokio_test::block_on(async {
631    /// # use linera_views::context::MemoryContext;
632    /// # use linera_views::key_value_store_view::KeyValueStoreView;
633    /// # use linera_views::views::View;
634    /// # let context = MemoryContext::new_for_testing(());
635    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
636    /// view.insert(vec![0, 1], vec![0]).await.unwrap();
637    /// view.insert(vec![0, 2], vec![0]).await.unwrap();
638    /// let indices = view.indices().await.unwrap();
639    /// assert_eq!(indices, vec![vec![0, 1], vec![0, 2]]);
640    /// # })
641    /// ```
642    pub async fn indices(&self) -> Result<Vec<Vec<u8>>, ViewError> {
643        let mut indices = Vec::new();
644        self.for_each_index(|index| {
645            indices.push(index.to_vec());
646            Ok(())
647        })
648        .await?;
649        Ok(indices)
650    }
651
652    /// Returns the list of indices and values in lexicographic order.
653    /// ```rust
654    /// # tokio_test::block_on(async {
655    /// # use linera_views::context::MemoryContext;
656    /// # use linera_views::key_value_store_view::KeyValueStoreView;
657    /// # use linera_views::views::View;
658    /// # let context = MemoryContext::new_for_testing(());
659    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
660    /// view.insert(vec![0, 1], vec![0]).await.unwrap();
661    /// view.insert(vec![0, 2], vec![0]).await.unwrap();
662    /// let key_values = view.indices().await.unwrap();
663    /// assert_eq!(key_values, vec![vec![0, 1], vec![0, 2]]);
664    /// # })
665    /// ```
666    pub async fn index_values(&self) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ViewError> {
667        let mut index_values = Vec::new();
668        self.for_each_index_value(|index, value| {
669            index_values.push((index.to_vec(), value.to_vec()));
670            Ok(())
671        })
672        .await?;
673        Ok(index_values)
674    }
675
676    /// Returns the number of entries.
677    /// ```rust
678    /// # tokio_test::block_on(async {
679    /// # use linera_views::context::MemoryContext;
680    /// # use linera_views::key_value_store_view::KeyValueStoreView;
681    /// # use linera_views::views::View;
682    /// # let context = MemoryContext::new_for_testing(());
683    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
684    /// view.insert(vec![0, 1], vec![0]).await.unwrap();
685    /// view.insert(vec![0, 2], vec![0]).await.unwrap();
686    /// let count = view.count().await.unwrap();
687    /// assert_eq!(count, 2);
688    /// # })
689    /// ```
690    pub async fn count(&self) -> Result<usize, ViewError> {
691        let mut count = 0;
692        self.for_each_index(|_index| {
693            count += 1;
694            Ok(())
695        })
696        .await?;
697        Ok(count)
698    }
699
700    /// Obtains the value at the given index, if any.
701    /// ```rust
702    /// # tokio_test::block_on(async {
703    /// # use linera_views::context::MemoryContext;
704    /// # use linera_views::key_value_store_view::KeyValueStoreView;
705    /// # use linera_views::views::View;
706    /// # let context = MemoryContext::new_for_testing(());
707    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
708    /// view.insert(vec![0, 1], vec![42]).await.unwrap();
709    /// assert_eq!(view.get(&[0, 1]).await.unwrap(), Some(vec![42]));
710    /// assert_eq!(view.get(&[0, 2]).await.unwrap(), None);
711    /// # })
712    /// ```
713    pub async fn get(&self, index: &[u8]) -> Result<Option<Vec<u8>>, ViewError> {
714        #[cfg(with_metrics)]
715        let _latency = metrics::KEY_VALUE_STORE_VIEW_GET_LATENCY.measure_latency();
716        ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
717        if let Some(update) = self.updates.get(index) {
718            let value = match update {
719                Update::Removed => None,
720                Update::Set(value) => Some(value.clone()),
721            };
722            return Ok(value);
723        }
724        if self.deletion_set.contains_prefix_of(index) {
725            return Ok(None);
726        }
727        let key = self
728            .context
729            .base_key()
730            .base_tag_index(KeyTag::Index as u8, index);
731        Ok(self.context.store().read_value_bytes(&key).await?)
732    }
733
734    /// Tests whether the store contains a specific index.
735    /// ```rust
736    /// # tokio_test::block_on(async {
737    /// # use linera_views::context::MemoryContext;
738    /// # use linera_views::key_value_store_view::KeyValueStoreView;
739    /// # use linera_views::views::View;
740    /// # let context = MemoryContext::new_for_testing(());
741    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
742    /// view.insert(vec![0, 1], vec![42]).await.unwrap();
743    /// assert!(view.contains_key(&[0, 1]).await.unwrap());
744    /// assert!(!view.contains_key(&[0, 2]).await.unwrap());
745    /// # })
746    /// ```
747    pub async fn contains_key(&self, index: &[u8]) -> Result<bool, ViewError> {
748        #[cfg(with_metrics)]
749        let _latency = metrics::KEY_VALUE_STORE_VIEW_CONTAINS_KEY_LATENCY.measure_latency();
750        ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
751        if let Some(update) = self.updates.get(index) {
752            let test = match update {
753                Update::Removed => false,
754                Update::Set(_value) => true,
755            };
756            return Ok(test);
757        }
758        if self.deletion_set.contains_prefix_of(index) {
759            return Ok(false);
760        }
761        let key = self
762            .context
763            .base_key()
764            .base_tag_index(KeyTag::Index as u8, index);
765        Ok(self.context.store().contains_key(&key).await?)
766    }
767
768    /// Tests whether the view contains a range of indices
769    /// ```rust
770    /// # tokio_test::block_on(async {
771    /// # use linera_views::context::MemoryContext;
772    /// # use linera_views::key_value_store_view::KeyValueStoreView;
773    /// # use linera_views::views::View;
774    /// # let context = MemoryContext::new_for_testing(());
775    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
776    /// view.insert(vec![0, 1], vec![42]).await.unwrap();
777    /// let keys = vec![vec![0, 1], vec![0, 2]];
778    /// let results = view.contains_keys(&keys).await.unwrap();
779    /// assert_eq!(results, vec![true, false]);
780    /// # })
781    /// ```
782    pub async fn contains_keys(&self, indices: &[Vec<u8>]) -> Result<Vec<bool>, ViewError> {
783        #[cfg(with_metrics)]
784        let _latency = metrics::KEY_VALUE_STORE_VIEW_CONTAINS_KEYS_LATENCY.measure_latency();
785        let mut results = Vec::with_capacity(indices.len());
786        let mut missed_indices = Vec::new();
787        let mut vector_query = Vec::new();
788        for (i, index) in indices.iter().enumerate() {
789            ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
790            if let Some(update) = self.updates.get(index) {
791                let value = match update {
792                    Update::Removed => false,
793                    Update::Set(_) => true,
794                };
795                results.push(value);
796            } else {
797                results.push(false);
798                if !self.deletion_set.contains_prefix_of(index) {
799                    missed_indices.push(i);
800                    let key = self
801                        .context
802                        .base_key()
803                        .base_tag_index(KeyTag::Index as u8, index);
804                    vector_query.push(key);
805                }
806            }
807        }
808        let values = self.context.store().contains_keys(&vector_query).await?;
809        for (i, value) in missed_indices.into_iter().zip(values) {
810            results[i] = value;
811        }
812        Ok(results)
813    }
814
815    /// Obtains the values of a range of indices
816    /// ```rust
817    /// # tokio_test::block_on(async {
818    /// # use linera_views::context::MemoryContext;
819    /// # use linera_views::key_value_store_view::KeyValueStoreView;
820    /// # use linera_views::views::View;
821    /// # let context = MemoryContext::new_for_testing(());
822    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
823    /// view.insert(vec![0, 1], vec![42]).await.unwrap();
824    /// assert_eq!(
825    ///     view.multi_get(&[vec![0, 1], vec![0, 2]]).await.unwrap(),
826    ///     vec![Some(vec![42]), None]
827    /// );
828    /// # })
829    /// ```
830    pub async fn multi_get(&self, indices: &[Vec<u8>]) -> Result<Vec<Option<Vec<u8>>>, ViewError> {
831        #[cfg(with_metrics)]
832        let _latency = metrics::KEY_VALUE_STORE_VIEW_MULTI_GET_LATENCY.measure_latency();
833        let mut result = Vec::with_capacity(indices.len());
834        let mut missed_indices = Vec::new();
835        let mut vector_query = Vec::new();
836        for (i, index) in indices.iter().enumerate() {
837            ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
838            if let Some(update) = self.updates.get(index) {
839                let value = match update {
840                    Update::Removed => None,
841                    Update::Set(value) => Some(value.clone()),
842                };
843                result.push(value);
844            } else {
845                result.push(None);
846                if !self.deletion_set.contains_prefix_of(index) {
847                    missed_indices.push(i);
848                    let key = self
849                        .context
850                        .base_key()
851                        .base_tag_index(KeyTag::Index as u8, index);
852                    vector_query.push(key);
853                }
854            }
855        }
856        let values = self
857            .context
858            .store()
859            .read_multi_values_bytes(&vector_query)
860            .await?;
861        for (i, value) in missed_indices.into_iter().zip(values) {
862            result[i] = value;
863        }
864        Ok(result)
865    }
866
867    /// Applies the given batch of `crate::common::WriteOperation`.
868    /// ```rust
869    /// # tokio_test::block_on(async {
870    /// # use linera_views::context::MemoryContext;
871    /// # use linera_views::key_value_store_view::KeyValueStoreView;
872    /// # use linera_views::batch::Batch;
873    /// # use linera_views::views::View;
874    /// # let context = MemoryContext::new_for_testing(());
875    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
876    /// view.insert(vec![0, 1], vec![34]).await.unwrap();
877    /// view.insert(vec![3, 4], vec![42]).await.unwrap();
878    /// let mut batch = Batch::new();
879    /// batch.delete_key_prefix(vec![0]);
880    /// view.write_batch(batch).await.unwrap();
881    /// let key_values = view.find_key_values_by_prefix(&[0]).await.unwrap();
882    /// assert_eq!(key_values, vec![]);
883    /// # })
884    /// ```
885    pub async fn write_batch(&mut self, batch: Batch) -> Result<(), ViewError> {
886        #[cfg(with_metrics)]
887        let _latency = metrics::KEY_VALUE_STORE_VIEW_WRITE_BATCH_LATENCY.measure_latency();
888        *self.hash.get_mut().unwrap() = None;
889        let max_key_size = self.max_key_size();
890        for operation in batch.operations {
891            match operation {
892                WriteOperation::Delete { key } => {
893                    ensure!(key.len() <= max_key_size, ViewError::KeyTooLong);
894                    if let Some(value) = self.sizes.get(&key).await? {
895                        let entry_size = SizeData {
896                            key: u32::try_from(key.len()).map_err(|_| ArithmeticError::Overflow)?,
897                            value,
898                        };
899                        self.total_size.sub_assign(entry_size);
900                    }
901                    self.sizes.remove(key.clone());
902                    if self.deletion_set.contains_prefix_of(&key) {
903                        // Optimization: No need to mark `short_key` for deletion as we are going to remove all the keys at once.
904                        self.updates.remove(&key);
905                    } else {
906                        self.updates.insert(key, Update::Removed);
907                    }
908                }
909                WriteOperation::Put { key, value } => {
910                    ensure!(key.len() <= max_key_size, ViewError::KeyTooLong);
911                    let entry_size = SizeData {
912                        key: key.len() as u32,
913                        value: value.len() as u32,
914                    };
915                    self.total_size.add_assign(entry_size)?;
916                    if let Some(value) = self.sizes.get(&key).await? {
917                        let entry_size = SizeData {
918                            key: key.len() as u32,
919                            value,
920                        };
921                        self.total_size.sub_assign(entry_size);
922                    }
923                    self.sizes.insert(key.clone(), entry_size.value);
924                    self.updates.insert(key, Update::Set(value));
925                }
926                WriteOperation::DeletePrefix { key_prefix } => {
927                    ensure!(key_prefix.len() <= max_key_size, ViewError::KeyTooLong);
928                    let key_list = self
929                        .updates
930                        .range(get_key_range_for_prefix(key_prefix.clone()))
931                        .map(|x| x.0.to_vec())
932                        .collect::<Vec<_>>();
933                    for key in key_list {
934                        self.updates.remove(&key);
935                    }
936                    let key_values = self.sizes.key_values_by_prefix(key_prefix.clone()).await?;
937                    for (key, value) in key_values {
938                        let entry_size = SizeData {
939                            key: key.len() as u32,
940                            value,
941                        };
942                        self.total_size.sub_assign(entry_size);
943                        self.sizes.remove(key);
944                    }
945                    self.sizes.remove_by_prefix(key_prefix.clone());
946                    self.deletion_set.insert_key_prefix(key_prefix);
947                }
948            }
949        }
950        Ok(())
951    }
952
953    /// Sets or inserts a value.
954    /// ```rust
955    /// # tokio_test::block_on(async {
956    /// # use linera_views::context::MemoryContext;
957    /// # use linera_views::key_value_store_view::KeyValueStoreView;
958    /// # use linera_views::views::View;
959    /// # let context = MemoryContext::new_for_testing(());
960    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
961    /// view.insert(vec![0, 1], vec![34]).await.unwrap();
962    /// assert_eq!(view.get(&[0, 1]).await.unwrap(), Some(vec![34]));
963    /// # })
964    /// ```
965    pub async fn insert(&mut self, index: Vec<u8>, value: Vec<u8>) -> Result<(), ViewError> {
966        let mut batch = Batch::new();
967        batch.put_key_value_bytes(index, value);
968        self.write_batch(batch).await
969    }
970
971    /// Removes a value. If absent then the action has no effect.
972    /// ```rust
973    /// # tokio_test::block_on(async {
974    /// # use linera_views::context::MemoryContext;
975    /// # use linera_views::key_value_store_view::KeyValueStoreView;
976    /// # use linera_views::views::View;
977    /// # let context = MemoryContext::new_for_testing(());
978    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
979    /// view.insert(vec![0, 1], vec![34]).await.unwrap();
980    /// view.remove(vec![0, 1]).await.unwrap();
981    /// assert_eq!(view.get(&[0, 1]).await.unwrap(), None);
982    /// # })
983    /// ```
984    pub async fn remove(&mut self, index: Vec<u8>) -> Result<(), ViewError> {
985        let mut batch = Batch::new();
986        batch.delete_key(index);
987        self.write_batch(batch).await
988    }
989
990    /// Deletes a key prefix.
991    /// ```rust
992    /// # tokio_test::block_on(async {
993    /// # use linera_views::context::MemoryContext;
994    /// # use linera_views::key_value_store_view::KeyValueStoreView;
995    /// # use linera_views::views::View;
996    /// # let context = MemoryContext::new_for_testing(());
997    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
998    /// view.insert(vec![0, 1], vec![34]).await.unwrap();
999    /// view.remove_by_prefix(vec![0]).await.unwrap();
1000    /// assert_eq!(view.get(&[0, 1]).await.unwrap(), None);
1001    /// # })
1002    /// ```
1003    pub async fn remove_by_prefix(&mut self, key_prefix: Vec<u8>) -> Result<(), ViewError> {
1004        let mut batch = Batch::new();
1005        batch.delete_key_prefix(key_prefix);
1006        self.write_batch(batch).await
1007    }
1008
1009    /// Iterates over all the keys matching the given prefix. The prefix is not included in the returned keys.
1010    /// ```rust
1011    /// # tokio_test::block_on(async {
1012    /// # use linera_views::context::MemoryContext;
1013    /// # use linera_views::key_value_store_view::KeyValueStoreView;
1014    /// # use linera_views::views::View;
1015    /// # let context = MemoryContext::new_for_testing(());
1016    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
1017    /// view.insert(vec![0, 1], vec![34]).await.unwrap();
1018    /// view.insert(vec![3, 4], vec![42]).await.unwrap();
1019    /// let keys = view.find_keys_by_prefix(&[0]).await.unwrap();
1020    /// assert_eq!(keys, vec![vec![1]]);
1021    /// # })
1022    /// ```
1023    pub async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Vec<Vec<u8>>, ViewError> {
1024        #[cfg(with_metrics)]
1025        let _latency = metrics::KEY_VALUE_STORE_VIEW_FIND_KEYS_BY_PREFIX_LATENCY.measure_latency();
1026        ensure!(
1027            key_prefix.len() <= self.max_key_size(),
1028            ViewError::KeyTooLong
1029        );
1030        let len = key_prefix.len();
1031        let key_prefix_full = self
1032            .context
1033            .base_key()
1034            .base_tag_index(KeyTag::Index as u8, key_prefix);
1035        let mut keys = Vec::new();
1036        let key_prefix_upper = get_upper_bound(key_prefix);
1037        let mut updates = self
1038            .updates
1039            .range((Included(key_prefix.to_vec()), key_prefix_upper));
1040        let mut update = updates.next();
1041        if !self.deletion_set.delete_storage_first {
1042            let mut suffix_closed_set =
1043                SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
1044            for key in self
1045                .context
1046                .store()
1047                .find_keys_by_prefix(&key_prefix_full)
1048                .await?
1049            {
1050                loop {
1051                    match update {
1052                        Some((update_key, update_value))
1053                            if &update_key[len..] <= key.as_slice() =>
1054                        {
1055                            if let Update::Set(_) = update_value {
1056                                keys.push(update_key[len..].to_vec());
1057                            }
1058                            update = updates.next();
1059                            if update_key[len..] == key[..] {
1060                                break;
1061                            }
1062                        }
1063                        _ => {
1064                            let mut key_with_prefix = key_prefix.to_vec();
1065                            key_with_prefix.extend_from_slice(&key);
1066                            if !suffix_closed_set.find_key(&key_with_prefix) {
1067                                keys.push(key);
1068                            }
1069                            break;
1070                        }
1071                    }
1072                }
1073            }
1074        }
1075        while let Some((update_key, update_value)) = update {
1076            if let Update::Set(_) = update_value {
1077                let update_key = update_key[len..].to_vec();
1078                keys.push(update_key);
1079            }
1080            update = updates.next();
1081        }
1082        Ok(keys)
1083    }
1084
1085    /// Iterates over all the key-value pairs, for keys matching the given prefix. The
1086    /// prefix is not included in the returned keys.
1087    /// ```rust
1088    /// # tokio_test::block_on(async {
1089    /// # use linera_views::context::MemoryContext;
1090    /// # use linera_views::key_value_store_view::KeyValueStoreView;
1091    /// # use linera_views::views::View;
1092    /// # let context = MemoryContext::new_for_testing(());
1093    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
1094    /// view.insert(vec![0, 1], vec![34]).await.unwrap();
1095    /// view.insert(vec![3, 4], vec![42]).await.unwrap();
1096    /// let key_values = view.find_key_values_by_prefix(&[0]).await.unwrap();
1097    /// assert_eq!(key_values, vec![(vec![1], vec![34])]);
1098    /// # })
1099    /// ```
1100    pub async fn find_key_values_by_prefix(
1101        &self,
1102        key_prefix: &[u8],
1103    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ViewError> {
1104        #[cfg(with_metrics)]
1105        let _latency =
1106            metrics::KEY_VALUE_STORE_VIEW_FIND_KEY_VALUES_BY_PREFIX_LATENCY.measure_latency();
1107        ensure!(
1108            key_prefix.len() <= self.max_key_size(),
1109            ViewError::KeyTooLong
1110        );
1111        let len = key_prefix.len();
1112        let key_prefix_full = self
1113            .context
1114            .base_key()
1115            .base_tag_index(KeyTag::Index as u8, key_prefix);
1116        let mut key_values = Vec::new();
1117        let key_prefix_upper = get_upper_bound(key_prefix);
1118        let mut updates = self
1119            .updates
1120            .range((Included(key_prefix.to_vec()), key_prefix_upper));
1121        let mut update = updates.next();
1122        if !self.deletion_set.delete_storage_first {
1123            let mut suffix_closed_set =
1124                SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
1125            for entry in self
1126                .context
1127                .store()
1128                .find_key_values_by_prefix(&key_prefix_full)
1129                .await?
1130            {
1131                let (key, value) = entry;
1132                loop {
1133                    match update {
1134                        Some((update_key, update_value)) if update_key[len..] <= key[..] => {
1135                            if let Update::Set(update_value) = update_value {
1136                                let key_value = (update_key[len..].to_vec(), update_value.to_vec());
1137                                key_values.push(key_value);
1138                            }
1139                            update = updates.next();
1140                            if update_key[len..] == key[..] {
1141                                break;
1142                            }
1143                        }
1144                        _ => {
1145                            let mut key_with_prefix = key_prefix.to_vec();
1146                            key_with_prefix.extend_from_slice(&key);
1147                            if !suffix_closed_set.find_key(&key_with_prefix) {
1148                                key_values.push((key, value));
1149                            }
1150                            break;
1151                        }
1152                    }
1153                }
1154            }
1155        }
1156        while let Some((update_key, update_value)) = update {
1157            if let Update::Set(update_value) = update_value {
1158                let key_value = (update_key[len..].to_vec(), update_value.to_vec());
1159                key_values.push(key_value);
1160            }
1161            update = updates.next();
1162        }
1163        Ok(key_values)
1164    }
1165
1166    async fn compute_hash(&self) -> Result<<sha3::Sha3_256 as Hasher>::Output, ViewError> {
1167        #[cfg(with_metrics)]
1168        let _hash_latency = metrics::KEY_VALUE_STORE_VIEW_HASH_LATENCY.measure_latency();
1169        let mut hasher = sha3::Sha3_256::default();
1170        let mut count = 0u32;
1171        self.for_each_index_value(|index, value| -> Result<(), ViewError> {
1172            count += 1;
1173            hasher.update_with_bytes(index)?;
1174            hasher.update_with_bytes(value)?;
1175            Ok(())
1176        })
1177        .await?;
1178        hasher.update_with_bcs_bytes(&count)?;
1179        Ok(hasher.finalize())
1180    }
1181}
1182
1183impl<C: Context> HashableView for KeyValueStoreView<C> {
1184    type Hasher = sha3::Sha3_256;
1185
1186    async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
1187        let hash = *self.hash.get_mut().unwrap();
1188        match hash {
1189            Some(hash) => Ok(hash),
1190            None => {
1191                let new_hash = self.compute_hash().await?;
1192                let hash = self.hash.get_mut().unwrap();
1193                *hash = Some(new_hash);
1194                Ok(new_hash)
1195            }
1196        }
1197    }
1198
1199    async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
1200        let hash = *self.hash.lock().unwrap();
1201        match hash {
1202            Some(hash) => Ok(hash),
1203            None => {
1204                let new_hash = self.compute_hash().await?;
1205                let mut hash = self.hash.lock().unwrap();
1206                *hash = Some(new_hash);
1207                Ok(new_hash)
1208            }
1209        }
1210    }
1211}
1212
1213/// Type wrapping `KeyValueStoreView` while memoizing the hash.
1214pub type HashedKeyValueStoreView<C> =
1215    WrappedHashableContainerView<C, KeyValueStoreView<C>, HasherOutput>;
1216
1217/// Wrapper around `KeyValueStoreView` to compute hashes based on the history of changes.
1218pub type HistoricallyHashedKeyValueStoreView<C> = HistoricallyHashableView<C, KeyValueStoreView<C>>;
1219
1220/// A virtual DB client using a `KeyValueStoreView` as a backend (testing only).
1221#[cfg(with_testing)]
1222#[derive(Debug, Clone)]
1223pub struct ViewContainer<C> {
1224    view: Arc<RwLock<KeyValueStoreView<C>>>,
1225}
1226
1227#[cfg(with_testing)]
1228impl<C> WithError for ViewContainer<C> {
1229    type Error = ViewContainerError;
1230}
1231
1232#[cfg(with_testing)]
1233/// The error type for [`ViewContainer`] operations.
1234#[derive(Error, Debug)]
1235pub enum ViewContainerError {
1236    /// View error.
1237    #[error(transparent)]
1238    ViewError(#[from] ViewError),
1239
1240    /// BCS serialization error.
1241    #[error(transparent)]
1242    BcsError(#[from] bcs::Error),
1243}
1244
1245#[cfg(with_testing)]
1246impl KeyValueStoreError for ViewContainerError {
1247    const BACKEND: &'static str = "view_container";
1248}
1249
1250#[cfg(with_testing)]
1251impl<C: Context> ReadableKeyValueStore for ViewContainer<C> {
1252    const MAX_KEY_SIZE: usize = <C::Store as ReadableKeyValueStore>::MAX_KEY_SIZE;
1253
1254    fn max_stream_queries(&self) -> usize {
1255        1
1256    }
1257
1258    fn root_key(&self) -> Result<Vec<u8>, ViewContainerError> {
1259        Ok(Vec::new())
1260    }
1261
1262    async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, ViewContainerError> {
1263        let view = self.view.read().await;
1264        Ok(view.get(key).await?)
1265    }
1266
1267    async fn contains_key(&self, key: &[u8]) -> Result<bool, ViewContainerError> {
1268        let view = self.view.read().await;
1269        Ok(view.contains_key(key).await?)
1270    }
1271
1272    async fn contains_keys(&self, keys: &[Vec<u8>]) -> Result<Vec<bool>, ViewContainerError> {
1273        let view = self.view.read().await;
1274        Ok(view.contains_keys(keys).await?)
1275    }
1276
1277    async fn read_multi_values_bytes(
1278        &self,
1279        keys: &[Vec<u8>],
1280    ) -> Result<Vec<Option<Vec<u8>>>, ViewContainerError> {
1281        let view = self.view.read().await;
1282        Ok(view.multi_get(keys).await?)
1283    }
1284
1285    async fn find_keys_by_prefix(
1286        &self,
1287        key_prefix: &[u8],
1288    ) -> Result<Vec<Vec<u8>>, ViewContainerError> {
1289        let view = self.view.read().await;
1290        Ok(view.find_keys_by_prefix(key_prefix).await?)
1291    }
1292
1293    async fn find_key_values_by_prefix(
1294        &self,
1295        key_prefix: &[u8],
1296    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ViewContainerError> {
1297        let view = self.view.read().await;
1298        Ok(view.find_key_values_by_prefix(key_prefix).await?)
1299    }
1300}
1301
1302#[cfg(with_testing)]
1303impl<C: Context> WritableKeyValueStore for ViewContainer<C> {
1304    const MAX_VALUE_SIZE: usize = <C::Store as WritableKeyValueStore>::MAX_VALUE_SIZE;
1305
1306    async fn write_batch(&self, batch: Batch) -> Result<(), ViewContainerError> {
1307        let mut view = self.view.write().await;
1308        view.write_batch(batch).await?;
1309        let mut batch = Batch::new();
1310        view.pre_save(&mut batch)?;
1311        view.post_save();
1312        view.context()
1313            .store()
1314            .write_batch(batch)
1315            .await
1316            .map_err(ViewError::from)?;
1317        Ok(())
1318    }
1319
1320    async fn clear_journal(&self) -> Result<(), ViewContainerError> {
1321        Ok(())
1322    }
1323}
1324
1325#[cfg(with_testing)]
1326impl<C: Context> ViewContainer<C> {
1327    /// Creates a [`ViewContainer`].
1328    pub async fn new(context: C) -> Result<Self, ViewError> {
1329        let view = KeyValueStoreView::load(context).await?;
1330        let view = Arc::new(RwLock::new(view));
1331        Ok(Self { view })
1332    }
1333}