Skip to main content

linera_views/views/
key_value_store_view.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! We implement two types:
5//! 1) The first type `KeyValueStoreView` implements View and the function of `KeyValueStore`.
6//!
7//! 2) The second type `ViewContainer` encapsulates `KeyValueStoreView` and provides the following functionalities:
8//!    * The `Clone` trait
9//!    * a `write_batch` that takes a `&self` instead of a `&mut self`
10//!    * a `write_batch` that writes in the context instead of writing of the view.
11//!
12//! Currently, that second type is only used for tests.
13//!
14//! Key tags to create the sub-keys of a `KeyValueStoreView` on top of the base key.
15
16use std::{collections::BTreeMap, fmt::Debug, ops::Bound::Included, sync::Mutex};
17
18use allocative::Allocative;
19#[cfg(with_metrics)]
20use linera_base::prometheus_util::MeasureLatency as _;
21use linera_base::{data_types::ArithmeticError, ensure, visit_allocative_simple};
22use serde::{Deserialize, Serialize};
23
24use crate::{
25    batch::{Batch, WriteOperation},
26    common::{
27        from_bytes_option, get_key_range_for_prefix, get_upper_bound, DeletionSet, HasherOutput,
28        SuffixClosedSetIterator, Update,
29    },
30    context::Context,
31    hashable_wrapper::WrappedHashableContainerView,
32    historical_hash_wrapper::HistoricallyHashableView,
33    store::ReadableKeyValueStore,
34    views::{ClonableView, HashableView, Hasher, ReplaceContext, View, ViewError, MIN_VIEW_TAG},
35};
36
37#[cfg(with_metrics)]
38mod metrics {
39    use std::sync::LazyLock;
40
41    use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
42    use prometheus::HistogramVec;
43
44    /// The latency of hash computation
45    pub static KEY_VALUE_STORE_VIEW_HASH_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
46        register_histogram_vec(
47            "key_value_store_view_hash_latency",
48            "KeyValueStoreView hash latency",
49            &[],
50            exponential_bucket_latencies(5.0),
51        )
52    });
53
54    /// The latency of get operation
55    pub static KEY_VALUE_STORE_VIEW_GET_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
56        register_histogram_vec(
57            "key_value_store_view_get_latency",
58            "KeyValueStoreView get latency",
59            &[],
60            exponential_bucket_latencies(5.0),
61        )
62    });
63
64    /// The latency of multi get
65    pub static KEY_VALUE_STORE_VIEW_MULTI_GET_LATENCY: LazyLock<HistogramVec> =
66        LazyLock::new(|| {
67            register_histogram_vec(
68                "key_value_store_view_multi_get_latency",
69                "KeyValueStoreView multi get latency",
70                &[],
71                exponential_bucket_latencies(5.0),
72            )
73        });
74
75    /// The latency of contains key
76    pub static KEY_VALUE_STORE_VIEW_CONTAINS_KEY_LATENCY: LazyLock<HistogramVec> =
77        LazyLock::new(|| {
78            register_histogram_vec(
79                "key_value_store_view_contains_key_latency",
80                "KeyValueStoreView contains key latency",
81                &[],
82                exponential_bucket_latencies(5.0),
83            )
84        });
85
86    /// The latency of contains keys
87    pub static KEY_VALUE_STORE_VIEW_CONTAINS_KEYS_LATENCY: LazyLock<HistogramVec> =
88        LazyLock::new(|| {
89            register_histogram_vec(
90                "key_value_store_view_contains_keys_latency",
91                "KeyValueStoreView contains keys latency",
92                &[],
93                exponential_bucket_latencies(5.0),
94            )
95        });
96
97    /// The latency of find keys by prefix operation
98    pub static KEY_VALUE_STORE_VIEW_FIND_KEYS_BY_PREFIX_LATENCY: LazyLock<HistogramVec> =
99        LazyLock::new(|| {
100            register_histogram_vec(
101                "key_value_store_view_find_keys_by_prefix_latency",
102                "KeyValueStoreView find keys by prefix latency",
103                &[],
104                exponential_bucket_latencies(5.0),
105            )
106        });
107
108    /// The latency of find key values by prefix operation
109    pub static KEY_VALUE_STORE_VIEW_FIND_KEY_VALUES_BY_PREFIX_LATENCY: LazyLock<HistogramVec> =
110        LazyLock::new(|| {
111            register_histogram_vec(
112                "key_value_store_view_find_key_values_by_prefix_latency",
113                "KeyValueStoreView find key values by prefix latency",
114                &[],
115                exponential_bucket_latencies(5.0),
116            )
117        });
118
119    /// The latency of write batch operation
120    pub static KEY_VALUE_STORE_VIEW_WRITE_BATCH_LATENCY: LazyLock<HistogramVec> =
121        LazyLock::new(|| {
122            register_histogram_vec(
123                "key_value_store_view_write_batch_latency",
124                "KeyValueStoreView write batch latency",
125                &[],
126                exponential_bucket_latencies(5.0),
127            )
128        });
129}
130
131#[cfg(with_testing)]
132use {
133    crate::store::{KeyValueStoreError, WithError, WritableKeyValueStore},
134    async_lock::RwLock,
135    std::sync::Arc,
136    thiserror::Error,
137};
138
139#[repr(u8)]
140enum KeyTag {
141    /// Prefix for the indices of the view.
142    Index = MIN_VIEW_TAG,
143    /// Prefix for the hash. We fix the index for compatibility with existing contracts.
144    Hash = MIN_VIEW_TAG + 3,
145}
146
147/// A pair containing the key and value size.
148#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize, Allocative)]
149pub struct SizeData {
150    /// The size of the key
151    pub key: u32,
152    /// The size of the value
153    pub value: u32,
154}
155
156impl SizeData {
157    /// Sums both terms
158    pub fn sum(&mut self) -> u32 {
159        self.key + self.value
160    }
161
162    /// Adds a size to `self`
163    pub fn add_assign(&mut self, size: SizeData) -> Result<(), ViewError> {
164        self.key = self
165            .key
166            .checked_add(size.key)
167            .ok_or(ViewError::ArithmeticError(ArithmeticError::Overflow))?;
168        self.value = self
169            .value
170            .checked_add(size.value)
171            .ok_or(ViewError::ArithmeticError(ArithmeticError::Overflow))?;
172        Ok(())
173    }
174
175    /// Subtracts a size from `self`
176    pub fn sub_assign(&mut self, size: SizeData) {
177        self.key -= size.key;
178        self.value -= size.value;
179    }
180}
181
182/// A view that represents the functions of `KeyValueStore`.
183///
184/// Comment on the data set:
185/// In order to work, the view needs to store the updates and deleted prefixes.
186/// The updates and deleted prefixes have to be coherent. This means:
187/// * If an index is deleted by one in deleted prefixes then it should not be present
188///   in the updates at all.
189/// * [`DeletePrefix::key_prefix`][entry1] should not dominate anyone. That is if we have `[0,2]`
190///   then we should not have `[0,2,3]` since it would be dominated by the preceding.
191///
192/// With that we have:
193/// * in order to test if an `index` is deleted by a prefix we compute the highest deleted prefix `dp`
194///   such that `dp <= index`.
195///   If `dp` is indeed a prefix then we conclude that `index` is deleted, otherwise not.
196///   The no domination is essential here.
197///
198/// [entry1]: crate::batch::WriteOperation::DeletePrefix
199#[derive(Debug, Allocative)]
200#[allocative(bound = "C")]
201pub struct KeyValueStoreView<C> {
202    /// The view context.
203    #[allocative(skip)]
204    context: C,
205    /// Tracks deleted key prefixes.
206    deletion_set: DeletionSet,
207    /// Pending changes not yet persisted to storage.
208    updates: BTreeMap<Vec<u8>, Update<Vec<u8>>>,
209    /// The hash persisted in storage.
210    #[allocative(visit = visit_allocative_simple)]
211    stored_hash: Option<HasherOutput>,
212    /// Memoized hash, if any.
213    #[allocative(visit = visit_allocative_simple)]
214    hash: Mutex<Option<HasherOutput>>,
215}
216
217impl<C: Context, C2: Context> ReplaceContext<C2> for KeyValueStoreView<C> {
218    type Target = KeyValueStoreView<C2>;
219
220    async fn with_context(
221        &mut self,
222        ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
223    ) -> Self::Target {
224        let hash = *self.hash.lock().unwrap();
225        KeyValueStoreView {
226            context: ctx.clone()(self.context()),
227            deletion_set: self.deletion_set.clone(),
228            updates: self.updates.clone(),
229            stored_hash: self.stored_hash,
230            hash: Mutex::new(hash),
231        }
232    }
233}
234
235impl<C: Context> View for KeyValueStoreView<C> {
236    const NUM_INIT_KEYS: usize = 1;
237
238    type Context = C;
239
240    fn context(&self) -> &C {
241        &self.context
242    }
243
244    fn pre_load(context: &C) -> Result<Vec<Vec<u8>>, ViewError> {
245        let key_hash = context.base_key().base_tag(KeyTag::Hash as u8);
246        let v = vec![key_hash];
247        Ok(v)
248    }
249
250    fn post_load(context: C, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
251        let hash = from_bytes_option(values.first().ok_or(ViewError::PostLoadValuesError)?)?;
252        Ok(Self {
253            context,
254            deletion_set: DeletionSet::new(),
255            updates: BTreeMap::new(),
256            stored_hash: hash,
257            hash: Mutex::new(hash),
258        })
259    }
260
261    fn rollback(&mut self) {
262        self.deletion_set.rollback();
263        self.updates.clear();
264        *self.hash.get_mut().unwrap() = self.stored_hash;
265    }
266
267    async fn has_pending_changes(&self) -> bool {
268        if self.deletion_set.has_pending_changes() {
269            return true;
270        }
271        if !self.updates.is_empty() {
272            return true;
273        }
274        let hash = self.hash.lock().unwrap();
275        self.stored_hash != *hash
276    }
277
278    fn pre_save(&self, batch: &mut Batch) -> Result<bool, ViewError> {
279        let mut delete_view = false;
280        if self.deletion_set.delete_storage_first {
281            delete_view = true;
282            batch.delete_key_prefix(self.context.base_key().bytes.clone());
283            for (index, update) in self.updates.iter() {
284                if let Update::Set(value) = update {
285                    let key = self
286                        .context
287                        .base_key()
288                        .base_tag_index(KeyTag::Index as u8, index);
289                    batch.put_key_value_bytes(key, value.clone());
290                    delete_view = false;
291                }
292            }
293        } else {
294            for index in self.deletion_set.deleted_prefixes.iter() {
295                let key = self
296                    .context
297                    .base_key()
298                    .base_tag_index(KeyTag::Index as u8, index);
299                batch.delete_key_prefix(key);
300            }
301            for (index, update) in self.updates.iter() {
302                let key = self
303                    .context
304                    .base_key()
305                    .base_tag_index(KeyTag::Index as u8, index);
306                match update {
307                    Update::Removed => batch.delete_key(key),
308                    Update::Set(value) => batch.put_key_value_bytes(key, value.clone()),
309                }
310            }
311        }
312        let hash = *self.hash.lock().unwrap();
313        if self.stored_hash != hash {
314            let key = self.context.base_key().base_tag(KeyTag::Hash as u8);
315            match hash {
316                None => batch.delete_key(key),
317                Some(hash) => batch.put_key_value(key, &hash)?,
318            }
319        }
320        Ok(delete_view)
321    }
322
323    fn post_save(&mut self) {
324        self.deletion_set.delete_storage_first = false;
325        self.deletion_set.deleted_prefixes.clear();
326        self.updates.clear();
327        let hash = *self.hash.lock().unwrap();
328        self.stored_hash = hash;
329    }
330
331    fn clear(&mut self) {
332        self.deletion_set.clear();
333        self.updates.clear();
334        *self.hash.get_mut().unwrap() = None;
335    }
336}
337
338impl<C: Context> ClonableView for KeyValueStoreView<C> {
339    fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
340        Ok(KeyValueStoreView {
341            context: self.context.clone(),
342            deletion_set: self.deletion_set.clone(),
343            updates: self.updates.clone(),
344            stored_hash: self.stored_hash,
345            hash: Mutex::new(*self.hash.get_mut().unwrap()),
346        })
347    }
348}
349
350impl<C: Context> KeyValueStoreView<C> {
351    fn max_key_size(&self) -> usize {
352        let prefix_len = self.context.base_key().bytes.len();
353        <C::Store as ReadableKeyValueStore>::MAX_KEY_SIZE - 1 - prefix_len
354    }
355
356    /// Applies the function f over all indices. If the function f returns
357    /// false, then the loop ends prematurely.
358    /// ```rust
359    /// # tokio_test::block_on(async {
360    /// # use linera_views::context::MemoryContext;
361    /// # use linera_views::key_value_store_view::KeyValueStoreView;
362    /// # use linera_views::views::View;
363    /// # let context = MemoryContext::new_for_testing(());
364    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
365    /// view.insert(vec![0, 1], vec![0]).await.unwrap();
366    /// view.insert(vec![0, 2], vec![0]).await.unwrap();
367    /// view.insert(vec![0, 3], vec![0]).await.unwrap();
368    /// let mut count = 0;
369    /// view.for_each_index_while(|_key| {
370    ///     count += 1;
371    ///     Ok(count < 2)
372    /// })
373    /// .await
374    /// .unwrap();
375    /// assert_eq!(count, 2);
376    /// # })
377    /// ```
378    pub async fn for_each_index_while<F>(&self, mut f: F) -> Result<(), ViewError>
379    where
380        F: FnMut(&[u8]) -> Result<bool, ViewError> + Send,
381    {
382        let key_prefix = self.context.base_key().base_tag(KeyTag::Index as u8);
383        let mut updates = self.updates.iter();
384        let mut update = updates.next();
385        if !self.deletion_set.delete_storage_first {
386            let mut suffix_closed_set =
387                SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
388            for index in self
389                .context
390                .store()
391                .find_keys_by_prefix(&key_prefix)
392                .await?
393            {
394                loop {
395                    match update {
396                        Some((key, value)) if key <= &index => {
397                            if let Update::Set(_) = value {
398                                if !f(key)? {
399                                    return Ok(());
400                                }
401                            }
402                            update = updates.next();
403                            if key == &index {
404                                break;
405                            }
406                        }
407                        _ => {
408                            if !suffix_closed_set.find_key(&index) && !f(&index)? {
409                                return Ok(());
410                            }
411                            break;
412                        }
413                    }
414                }
415            }
416        }
417        while let Some((key, value)) = update {
418            if let Update::Set(_) = value {
419                if !f(key)? {
420                    return Ok(());
421                }
422            }
423            update = updates.next();
424        }
425        Ok(())
426    }
427
428    /// Applies the function f over all indices.
429    /// ```rust
430    /// # tokio_test::block_on(async {
431    /// # use linera_views::context::MemoryContext;
432    /// # use linera_views::key_value_store_view::KeyValueStoreView;
433    /// # use linera_views::views::View;
434    /// # let context = MemoryContext::new_for_testing(());
435    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
436    /// view.insert(vec![0, 1], vec![0]).await.unwrap();
437    /// view.insert(vec![0, 2], vec![0]).await.unwrap();
438    /// view.insert(vec![0, 3], vec![0]).await.unwrap();
439    /// let mut count = 0;
440    /// view.for_each_index(|_key| {
441    ///     count += 1;
442    ///     Ok(())
443    /// })
444    /// .await
445    /// .unwrap();
446    /// assert_eq!(count, 3);
447    /// # })
448    /// ```
449    pub async fn for_each_index<F>(&self, mut f: F) -> Result<(), ViewError>
450    where
451        F: FnMut(&[u8]) -> Result<(), ViewError> + Send,
452    {
453        self.for_each_index_while(|key| {
454            f(key)?;
455            Ok(true)
456        })
457        .await
458    }
459
460    /// Applies the function f over all index/value pairs.
461    /// If the function f returns false then the loop ends prematurely.
462    /// ```rust
463    /// # tokio_test::block_on(async {
464    /// # use linera_views::context::MemoryContext;
465    /// # use linera_views::key_value_store_view::KeyValueStoreView;
466    /// # use linera_views::views::View;
467    /// # let context = MemoryContext::new_for_testing(());
468    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
469    /// view.insert(vec![0, 1], vec![0]).await.unwrap();
470    /// view.insert(vec![0, 2], vec![0]).await.unwrap();
471    /// let mut values = Vec::new();
472    /// view.for_each_index_value_while(|_key, value| {
473    ///     values.push(value.to_vec());
474    ///     Ok(values.len() < 1)
475    /// })
476    /// .await
477    /// .unwrap();
478    /// assert_eq!(values, vec![vec![0]]);
479    /// # })
480    /// ```
481    pub async fn for_each_index_value_while<F>(&self, mut f: F) -> Result<(), ViewError>
482    where
483        F: FnMut(&[u8], &[u8]) -> Result<bool, ViewError> + Send,
484    {
485        let key_prefix = self.context.base_key().base_tag(KeyTag::Index as u8);
486        let mut updates = self.updates.iter();
487        let mut update = updates.next();
488        if !self.deletion_set.delete_storage_first {
489            let mut suffix_closed_set =
490                SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
491            for entry in self
492                .context
493                .store()
494                .find_key_values_by_prefix(&key_prefix)
495                .await?
496            {
497                let (index, index_val) = entry;
498                loop {
499                    match update {
500                        Some((key, value)) if key <= &index => {
501                            if let Update::Set(value) = value {
502                                if !f(key, value)? {
503                                    return Ok(());
504                                }
505                            }
506                            update = updates.next();
507                            if key == &index {
508                                break;
509                            }
510                        }
511                        _ => {
512                            if !suffix_closed_set.find_key(&index) && !f(&index, &index_val)? {
513                                return Ok(());
514                            }
515                            break;
516                        }
517                    }
518                }
519            }
520        }
521        while let Some((key, value)) = update {
522            if let Update::Set(value) = value {
523                if !f(key, value)? {
524                    return Ok(());
525                }
526            }
527            update = updates.next();
528        }
529        Ok(())
530    }
531
532    /// Applies the function f over all index/value pairs.
533    /// ```rust
534    /// # tokio_test::block_on(async {
535    /// # use linera_views::context::MemoryContext;
536    /// # use linera_views::key_value_store_view::KeyValueStoreView;
537    /// # use linera_views::views::View;
538    /// # let context = MemoryContext::new_for_testing(());
539    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
540    /// view.insert(vec![0, 1], vec![0]).await.unwrap();
541    /// view.insert(vec![0, 2], vec![0]).await.unwrap();
542    /// let mut part_keys = Vec::new();
543    /// view.for_each_index_while(|key| {
544    ///     part_keys.push(key.to_vec());
545    ///     Ok(part_keys.len() < 1)
546    /// })
547    /// .await
548    /// .unwrap();
549    /// assert_eq!(part_keys, vec![vec![0, 1]]);
550    /// # })
551    /// ```
552    pub async fn for_each_index_value<F>(&self, mut f: F) -> Result<(), ViewError>
553    where
554        F: FnMut(&[u8], &[u8]) -> Result<(), ViewError> + Send,
555    {
556        self.for_each_index_value_while(|key, value| {
557            f(key, value)?;
558            Ok(true)
559        })
560        .await
561    }
562
563    /// Returns the list of indices in lexicographic order.
564    /// ```rust
565    /// # tokio_test::block_on(async {
566    /// # use linera_views::context::MemoryContext;
567    /// # use linera_views::key_value_store_view::KeyValueStoreView;
568    /// # use linera_views::views::View;
569    /// # let context = MemoryContext::new_for_testing(());
570    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
571    /// view.insert(vec![0, 1], vec![0]).await.unwrap();
572    /// view.insert(vec![0, 2], vec![0]).await.unwrap();
573    /// let indices = view.indices().await.unwrap();
574    /// assert_eq!(indices, vec![vec![0, 1], vec![0, 2]]);
575    /// # })
576    /// ```
577    pub async fn indices(&self) -> Result<Vec<Vec<u8>>, ViewError> {
578        let mut indices = Vec::new();
579        self.for_each_index(|index| {
580            indices.push(index.to_vec());
581            Ok(())
582        })
583        .await?;
584        Ok(indices)
585    }
586
587    /// Returns the list of indices and values in lexicographic order.
588    /// ```rust
589    /// # tokio_test::block_on(async {
590    /// # use linera_views::context::MemoryContext;
591    /// # use linera_views::key_value_store_view::KeyValueStoreView;
592    /// # use linera_views::views::View;
593    /// # let context = MemoryContext::new_for_testing(());
594    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
595    /// view.insert(vec![0, 1], vec![0]).await.unwrap();
596    /// view.insert(vec![0, 2], vec![0]).await.unwrap();
597    /// let key_values = view.indices().await.unwrap();
598    /// assert_eq!(key_values, vec![vec![0, 1], vec![0, 2]]);
599    /// # })
600    /// ```
601    pub async fn index_values(&self) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ViewError> {
602        let mut index_values = Vec::new();
603        self.for_each_index_value(|index, value| {
604            index_values.push((index.to_vec(), value.to_vec()));
605            Ok(())
606        })
607        .await?;
608        Ok(index_values)
609    }
610
611    /// Returns the number of entries.
612    /// ```rust
613    /// # tokio_test::block_on(async {
614    /// # use linera_views::context::MemoryContext;
615    /// # use linera_views::key_value_store_view::KeyValueStoreView;
616    /// # use linera_views::views::View;
617    /// # let context = MemoryContext::new_for_testing(());
618    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
619    /// view.insert(vec![0, 1], vec![0]).await.unwrap();
620    /// view.insert(vec![0, 2], vec![0]).await.unwrap();
621    /// let count = view.count().await.unwrap();
622    /// assert_eq!(count, 2);
623    /// # })
624    /// ```
625    pub async fn count(&self) -> Result<usize, ViewError> {
626        let mut count = 0;
627        self.for_each_index(|_index| {
628            count += 1;
629            Ok(())
630        })
631        .await?;
632        Ok(count)
633    }
634
635    /// Obtains the value at the given index, if any.
636    /// ```rust
637    /// # tokio_test::block_on(async {
638    /// # use linera_views::context::MemoryContext;
639    /// # use linera_views::key_value_store_view::KeyValueStoreView;
640    /// # use linera_views::views::View;
641    /// # let context = MemoryContext::new_for_testing(());
642    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
643    /// view.insert(vec![0, 1], vec![42]).await.unwrap();
644    /// assert_eq!(view.get(&[0, 1]).await.unwrap(), Some(vec![42]));
645    /// assert_eq!(view.get(&[0, 2]).await.unwrap(), None);
646    /// # })
647    /// ```
648    pub async fn get(&self, index: &[u8]) -> Result<Option<Vec<u8>>, ViewError> {
649        #[cfg(with_metrics)]
650        let _latency = metrics::KEY_VALUE_STORE_VIEW_GET_LATENCY.measure_latency();
651        ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
652        if let Some(update) = self.updates.get(index) {
653            let value = match update {
654                Update::Removed => None,
655                Update::Set(value) => Some(value.clone()),
656            };
657            return Ok(value);
658        }
659        if self.deletion_set.contains_prefix_of(index) {
660            return Ok(None);
661        }
662        let key = self
663            .context
664            .base_key()
665            .base_tag_index(KeyTag::Index as u8, index);
666        Ok(self.context.store().read_value_bytes(&key).await?)
667    }
668
669    /// Tests whether the store contains a specific index.
670    /// ```rust
671    /// # tokio_test::block_on(async {
672    /// # use linera_views::context::MemoryContext;
673    /// # use linera_views::key_value_store_view::KeyValueStoreView;
674    /// # use linera_views::views::View;
675    /// # let context = MemoryContext::new_for_testing(());
676    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
677    /// view.insert(vec![0, 1], vec![42]).await.unwrap();
678    /// assert!(view.contains_key(&[0, 1]).await.unwrap());
679    /// assert!(!view.contains_key(&[0, 2]).await.unwrap());
680    /// # })
681    /// ```
682    pub async fn contains_key(&self, index: &[u8]) -> Result<bool, ViewError> {
683        #[cfg(with_metrics)]
684        let _latency = metrics::KEY_VALUE_STORE_VIEW_CONTAINS_KEY_LATENCY.measure_latency();
685        ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
686        if let Some(update) = self.updates.get(index) {
687            let test = match update {
688                Update::Removed => false,
689                Update::Set(_value) => true,
690            };
691            return Ok(test);
692        }
693        if self.deletion_set.contains_prefix_of(index) {
694            return Ok(false);
695        }
696        let key = self
697            .context
698            .base_key()
699            .base_tag_index(KeyTag::Index as u8, index);
700        Ok(self.context.store().contains_key(&key).await?)
701    }
702
703    /// Tests whether the view contains a range of indices
704    /// ```rust
705    /// # tokio_test::block_on(async {
706    /// # use linera_views::context::MemoryContext;
707    /// # use linera_views::key_value_store_view::KeyValueStoreView;
708    /// # use linera_views::views::View;
709    /// # let context = MemoryContext::new_for_testing(());
710    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
711    /// view.insert(vec![0, 1], vec![42]).await.unwrap();
712    /// let keys = vec![vec![0, 1], vec![0, 2]];
713    /// let results = view.contains_keys(&keys).await.unwrap();
714    /// assert_eq!(results, vec![true, false]);
715    /// # })
716    /// ```
717    pub async fn contains_keys(&self, indices: &[Vec<u8>]) -> Result<Vec<bool>, ViewError> {
718        #[cfg(with_metrics)]
719        let _latency = metrics::KEY_VALUE_STORE_VIEW_CONTAINS_KEYS_LATENCY.measure_latency();
720        let mut results = Vec::with_capacity(indices.len());
721        let mut missed_indices = Vec::new();
722        let mut vector_query = Vec::new();
723        for (i, index) in indices.iter().enumerate() {
724            ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
725            if let Some(update) = self.updates.get(index) {
726                let value = match update {
727                    Update::Removed => false,
728                    Update::Set(_) => true,
729                };
730                results.push(value);
731            } else {
732                results.push(false);
733                if !self.deletion_set.contains_prefix_of(index) {
734                    missed_indices.push(i);
735                    let key = self
736                        .context
737                        .base_key()
738                        .base_tag_index(KeyTag::Index as u8, index);
739                    vector_query.push(key);
740                }
741            }
742        }
743        let values = self.context.store().contains_keys(&vector_query).await?;
744        for (i, value) in missed_indices.into_iter().zip(values) {
745            results[i] = value;
746        }
747        Ok(results)
748    }
749
750    /// Obtains the values of a range of indices
751    /// ```rust
752    /// # tokio_test::block_on(async {
753    /// # use linera_views::context::MemoryContext;
754    /// # use linera_views::key_value_store_view::KeyValueStoreView;
755    /// # use linera_views::views::View;
756    /// # let context = MemoryContext::new_for_testing(());
757    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
758    /// view.insert(vec![0, 1], vec![42]).await.unwrap();
759    /// assert_eq!(
760    ///     view.multi_get(&[vec![0, 1], vec![0, 2]]).await.unwrap(),
761    ///     vec![Some(vec![42]), None]
762    /// );
763    /// # })
764    /// ```
765    pub async fn multi_get(&self, indices: &[Vec<u8>]) -> Result<Vec<Option<Vec<u8>>>, ViewError> {
766        #[cfg(with_metrics)]
767        let _latency = metrics::KEY_VALUE_STORE_VIEW_MULTI_GET_LATENCY.measure_latency();
768        let mut result = Vec::with_capacity(indices.len());
769        let mut missed_indices = Vec::new();
770        let mut vector_query = Vec::new();
771        for (i, index) in indices.iter().enumerate() {
772            ensure!(index.len() <= self.max_key_size(), ViewError::KeyTooLong);
773            if let Some(update) = self.updates.get(index) {
774                let value = match update {
775                    Update::Removed => None,
776                    Update::Set(value) => Some(value.clone()),
777                };
778                result.push(value);
779            } else {
780                result.push(None);
781                if !self.deletion_set.contains_prefix_of(index) {
782                    missed_indices.push(i);
783                    let key = self
784                        .context
785                        .base_key()
786                        .base_tag_index(KeyTag::Index as u8, index);
787                    vector_query.push(key);
788                }
789            }
790        }
791        let values = self
792            .context
793            .store()
794            .read_multi_values_bytes(&vector_query)
795            .await?;
796        for (i, value) in missed_indices.into_iter().zip(values) {
797            result[i] = value;
798        }
799        Ok(result)
800    }
801
802    /// Applies the given batch of `crate::common::WriteOperation`.
803    /// ```rust
804    /// # tokio_test::block_on(async {
805    /// # use linera_views::context::MemoryContext;
806    /// # use linera_views::key_value_store_view::KeyValueStoreView;
807    /// # use linera_views::batch::Batch;
808    /// # use linera_views::views::View;
809    /// # let context = MemoryContext::new_for_testing(());
810    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
811    /// view.insert(vec![0, 1], vec![34]).await.unwrap();
812    /// view.insert(vec![3, 4], vec![42]).await.unwrap();
813    /// let mut batch = Batch::new();
814    /// batch.delete_key_prefix(vec![0]);
815    /// view.write_batch(batch).await.unwrap();
816    /// let key_values = view.find_key_values_by_prefix(&[0]).await.unwrap();
817    /// assert_eq!(key_values, vec![]);
818    /// # })
819    /// ```
820    pub async fn write_batch(&mut self, batch: Batch) -> Result<(), ViewError> {
821        #[cfg(with_metrics)]
822        let _latency = metrics::KEY_VALUE_STORE_VIEW_WRITE_BATCH_LATENCY.measure_latency();
823        *self.hash.get_mut().unwrap() = None;
824        let max_key_size = self.max_key_size();
825        for operation in batch.operations {
826            match operation {
827                WriteOperation::Delete { key } => {
828                    ensure!(key.len() <= max_key_size, ViewError::KeyTooLong);
829                    if self.deletion_set.contains_prefix_of(&key) {
830                        // Optimization: No need to mark `short_key` for deletion as we are going to remove all the keys at once.
831                        self.updates.remove(&key);
832                    } else {
833                        self.updates.insert(key, Update::Removed);
834                    }
835                }
836                WriteOperation::Put { key, value } => {
837                    ensure!(key.len() <= max_key_size, ViewError::KeyTooLong);
838                    self.updates.insert(key, Update::Set(value));
839                }
840                WriteOperation::DeletePrefix { key_prefix } => {
841                    ensure!(key_prefix.len() <= max_key_size, ViewError::KeyTooLong);
842                    let key_list = self
843                        .updates
844                        .range(get_key_range_for_prefix(key_prefix.clone()))
845                        .map(|x| x.0.to_vec())
846                        .collect::<Vec<_>>();
847                    for key in key_list {
848                        self.updates.remove(&key);
849                    }
850                    self.deletion_set.insert_key_prefix(key_prefix);
851                }
852            }
853        }
854        Ok(())
855    }
856
857    /// Sets or inserts a value.
858    /// ```rust
859    /// # tokio_test::block_on(async {
860    /// # use linera_views::context::MemoryContext;
861    /// # use linera_views::key_value_store_view::KeyValueStoreView;
862    /// # use linera_views::views::View;
863    /// # let context = MemoryContext::new_for_testing(());
864    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
865    /// view.insert(vec![0, 1], vec![34]).await.unwrap();
866    /// assert_eq!(view.get(&[0, 1]).await.unwrap(), Some(vec![34]));
867    /// # })
868    /// ```
869    pub async fn insert(&mut self, index: Vec<u8>, value: Vec<u8>) -> Result<(), ViewError> {
870        let mut batch = Batch::new();
871        batch.put_key_value_bytes(index, value);
872        self.write_batch(batch).await
873    }
874
875    /// Removes a value. If absent then the action has no effect.
876    /// ```rust
877    /// # tokio_test::block_on(async {
878    /// # use linera_views::context::MemoryContext;
879    /// # use linera_views::key_value_store_view::KeyValueStoreView;
880    /// # use linera_views::views::View;
881    /// # let context = MemoryContext::new_for_testing(());
882    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
883    /// view.insert(vec![0, 1], vec![34]).await.unwrap();
884    /// view.remove(vec![0, 1]).await.unwrap();
885    /// assert_eq!(view.get(&[0, 1]).await.unwrap(), None);
886    /// # })
887    /// ```
888    pub async fn remove(&mut self, index: Vec<u8>) -> Result<(), ViewError> {
889        let mut batch = Batch::new();
890        batch.delete_key(index);
891        self.write_batch(batch).await
892    }
893
894    /// Deletes a key prefix.
895    /// ```rust
896    /// # tokio_test::block_on(async {
897    /// # use linera_views::context::MemoryContext;
898    /// # use linera_views::key_value_store_view::KeyValueStoreView;
899    /// # use linera_views::views::View;
900    /// # let context = MemoryContext::new_for_testing(());
901    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
902    /// view.insert(vec![0, 1], vec![34]).await.unwrap();
903    /// view.remove_by_prefix(vec![0]).await.unwrap();
904    /// assert_eq!(view.get(&[0, 1]).await.unwrap(), None);
905    /// # })
906    /// ```
907    pub async fn remove_by_prefix(&mut self, key_prefix: Vec<u8>) -> Result<(), ViewError> {
908        let mut batch = Batch::new();
909        batch.delete_key_prefix(key_prefix);
910        self.write_batch(batch).await
911    }
912
913    /// Iterates over all the keys matching the given prefix. The prefix is not included in the returned keys.
914    /// ```rust
915    /// # tokio_test::block_on(async {
916    /// # use linera_views::context::MemoryContext;
917    /// # use linera_views::key_value_store_view::KeyValueStoreView;
918    /// # use linera_views::views::View;
919    /// # let context = MemoryContext::new_for_testing(());
920    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
921    /// view.insert(vec![0, 1], vec![34]).await.unwrap();
922    /// view.insert(vec![3, 4], vec![42]).await.unwrap();
923    /// let keys = view.find_keys_by_prefix(&[0]).await.unwrap();
924    /// assert_eq!(keys, vec![vec![1]]);
925    /// # })
926    /// ```
927    pub async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Vec<Vec<u8>>, ViewError> {
928        #[cfg(with_metrics)]
929        let _latency = metrics::KEY_VALUE_STORE_VIEW_FIND_KEYS_BY_PREFIX_LATENCY.measure_latency();
930        ensure!(
931            key_prefix.len() <= self.max_key_size(),
932            ViewError::KeyTooLong
933        );
934        let len = key_prefix.len();
935        let key_prefix_full = self
936            .context
937            .base_key()
938            .base_tag_index(KeyTag::Index as u8, key_prefix);
939        let mut keys = Vec::new();
940        let key_prefix_upper = get_upper_bound(key_prefix);
941        let mut updates = self
942            .updates
943            .range((Included(key_prefix.to_vec()), key_prefix_upper));
944        let mut update = updates.next();
945        if !self.deletion_set.delete_storage_first {
946            let mut suffix_closed_set =
947                SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
948            for key in self
949                .context
950                .store()
951                .find_keys_by_prefix(&key_prefix_full)
952                .await?
953            {
954                loop {
955                    match update {
956                        Some((update_key, update_value))
957                            if &update_key[len..] <= key.as_slice() =>
958                        {
959                            if let Update::Set(_) = update_value {
960                                keys.push(update_key[len..].to_vec());
961                            }
962                            update = updates.next();
963                            if update_key[len..] == key[..] {
964                                break;
965                            }
966                        }
967                        _ => {
968                            let mut key_with_prefix = key_prefix.to_vec();
969                            key_with_prefix.extend_from_slice(&key);
970                            if !suffix_closed_set.find_key(&key_with_prefix) {
971                                keys.push(key);
972                            }
973                            break;
974                        }
975                    }
976                }
977            }
978        }
979        while let Some((update_key, update_value)) = update {
980            if let Update::Set(_) = update_value {
981                let update_key = update_key[len..].to_vec();
982                keys.push(update_key);
983            }
984            update = updates.next();
985        }
986        Ok(keys)
987    }
988
989    /// Iterates over all the key-value pairs, for keys matching the given prefix. The
990    /// prefix is not included in the returned keys.
991    /// ```rust
992    /// # tokio_test::block_on(async {
993    /// # use linera_views::context::MemoryContext;
994    /// # use linera_views::key_value_store_view::KeyValueStoreView;
995    /// # use linera_views::views::View;
996    /// # let context = MemoryContext::new_for_testing(());
997    /// let mut view = KeyValueStoreView::load(context).await.unwrap();
998    /// view.insert(vec![0, 1], vec![34]).await.unwrap();
999    /// view.insert(vec![3, 4], vec![42]).await.unwrap();
1000    /// let key_values = view.find_key_values_by_prefix(&[0]).await.unwrap();
1001    /// assert_eq!(key_values, vec![(vec![1], vec![34])]);
1002    /// # })
1003    /// ```
1004    pub async fn find_key_values_by_prefix(
1005        &self,
1006        key_prefix: &[u8],
1007    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ViewError> {
1008        #[cfg(with_metrics)]
1009        let _latency =
1010            metrics::KEY_VALUE_STORE_VIEW_FIND_KEY_VALUES_BY_PREFIX_LATENCY.measure_latency();
1011        ensure!(
1012            key_prefix.len() <= self.max_key_size(),
1013            ViewError::KeyTooLong
1014        );
1015        let len = key_prefix.len();
1016        let key_prefix_full = self
1017            .context
1018            .base_key()
1019            .base_tag_index(KeyTag::Index as u8, key_prefix);
1020        let mut key_values = Vec::new();
1021        let key_prefix_upper = get_upper_bound(key_prefix);
1022        let mut updates = self
1023            .updates
1024            .range((Included(key_prefix.to_vec()), key_prefix_upper));
1025        let mut update = updates.next();
1026        if !self.deletion_set.delete_storage_first {
1027            let mut suffix_closed_set =
1028                SuffixClosedSetIterator::new(0, self.deletion_set.deleted_prefixes.iter());
1029            for entry in self
1030                .context
1031                .store()
1032                .find_key_values_by_prefix(&key_prefix_full)
1033                .await?
1034            {
1035                let (key, value) = entry;
1036                loop {
1037                    match update {
1038                        Some((update_key, update_value)) if update_key[len..] <= key[..] => {
1039                            if let Update::Set(update_value) = update_value {
1040                                let key_value = (update_key[len..].to_vec(), update_value.to_vec());
1041                                key_values.push(key_value);
1042                            }
1043                            update = updates.next();
1044                            if update_key[len..] == key[..] {
1045                                break;
1046                            }
1047                        }
1048                        _ => {
1049                            let mut key_with_prefix = key_prefix.to_vec();
1050                            key_with_prefix.extend_from_slice(&key);
1051                            if !suffix_closed_set.find_key(&key_with_prefix) {
1052                                key_values.push((key, value));
1053                            }
1054                            break;
1055                        }
1056                    }
1057                }
1058            }
1059        }
1060        while let Some((update_key, update_value)) = update {
1061            if let Update::Set(update_value) = update_value {
1062                let key_value = (update_key[len..].to_vec(), update_value.to_vec());
1063                key_values.push(key_value);
1064            }
1065            update = updates.next();
1066        }
1067        Ok(key_values)
1068    }
1069
1070    async fn compute_hash(&self) -> Result<<sha3::Sha3_256 as Hasher>::Output, ViewError> {
1071        #[cfg(with_metrics)]
1072        let _hash_latency = metrics::KEY_VALUE_STORE_VIEW_HASH_LATENCY.measure_latency();
1073        let mut hasher = sha3::Sha3_256::default();
1074        let mut count = 0u32;
1075        self.for_each_index_value(|index, value| -> Result<(), ViewError> {
1076            count += 1;
1077            hasher.update_with_bytes(index)?;
1078            hasher.update_with_bytes(value)?;
1079            Ok(())
1080        })
1081        .await?;
1082        hasher.update_with_bcs_bytes(&count)?;
1083        Ok(hasher.finalize())
1084    }
1085}
1086
1087impl<C: Context> HashableView for KeyValueStoreView<C> {
1088    type Hasher = sha3::Sha3_256;
1089
1090    async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
1091        let hash = *self.hash.get_mut().unwrap();
1092        match hash {
1093            Some(hash) => Ok(hash),
1094            None => {
1095                let new_hash = self.compute_hash().await?;
1096                let hash = self.hash.get_mut().unwrap();
1097                *hash = Some(new_hash);
1098                Ok(new_hash)
1099            }
1100        }
1101    }
1102
1103    async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
1104        let hash = *self.hash.lock().unwrap();
1105        match hash {
1106            Some(hash) => Ok(hash),
1107            None => {
1108                let new_hash = self.compute_hash().await?;
1109                let mut hash = self.hash.lock().unwrap();
1110                *hash = Some(new_hash);
1111                Ok(new_hash)
1112            }
1113        }
1114    }
1115}
1116
1117/// Type wrapping `KeyValueStoreView` while memoizing the hash.
1118pub type HashedKeyValueStoreView<C> =
1119    WrappedHashableContainerView<C, KeyValueStoreView<C>, HasherOutput>;
1120
1121/// Wrapper around `KeyValueStoreView` to compute hashes based on the history of changes.
1122pub type HistoricallyHashedKeyValueStoreView<C> = HistoricallyHashableView<C, KeyValueStoreView<C>>;
1123
1124/// A virtual DB client using a `KeyValueStoreView` as a backend (testing only).
1125#[cfg(with_testing)]
1126#[derive(Debug, Clone)]
1127pub struct ViewContainer<C> {
1128    view: Arc<RwLock<KeyValueStoreView<C>>>,
1129}
1130
1131#[cfg(with_testing)]
1132impl<C> WithError for ViewContainer<C> {
1133    type Error = ViewContainerError;
1134}
1135
1136#[cfg(with_testing)]
1137/// The error type for [`ViewContainer`] operations.
1138#[derive(Error, Debug)]
1139pub enum ViewContainerError {
1140    /// View error.
1141    #[error(transparent)]
1142    ViewError(#[from] ViewError),
1143
1144    /// BCS serialization error.
1145    #[error(transparent)]
1146    BcsError(#[from] bcs::Error),
1147}
1148
1149#[cfg(with_testing)]
1150impl KeyValueStoreError for ViewContainerError {
1151    const BACKEND: &'static str = "view_container";
1152}
1153
1154#[cfg(with_testing)]
1155impl<C: Context> ReadableKeyValueStore for ViewContainer<C> {
1156    const MAX_KEY_SIZE: usize = <C::Store as ReadableKeyValueStore>::MAX_KEY_SIZE;
1157
1158    fn max_stream_queries(&self) -> usize {
1159        1
1160    }
1161
1162    fn root_key(&self) -> Result<Vec<u8>, ViewContainerError> {
1163        Ok(Vec::new())
1164    }
1165
1166    async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, ViewContainerError> {
1167        let view = self.view.read().await;
1168        Ok(view.get(key).await?)
1169    }
1170
1171    async fn contains_key(&self, key: &[u8]) -> Result<bool, ViewContainerError> {
1172        let view = self.view.read().await;
1173        Ok(view.contains_key(key).await?)
1174    }
1175
1176    async fn contains_keys(&self, keys: &[Vec<u8>]) -> Result<Vec<bool>, ViewContainerError> {
1177        let view = self.view.read().await;
1178        Ok(view.contains_keys(keys).await?)
1179    }
1180
1181    async fn read_multi_values_bytes(
1182        &self,
1183        keys: &[Vec<u8>],
1184    ) -> Result<Vec<Option<Vec<u8>>>, ViewContainerError> {
1185        let view = self.view.read().await;
1186        Ok(view.multi_get(keys).await?)
1187    }
1188
1189    async fn find_keys_by_prefix(
1190        &self,
1191        key_prefix: &[u8],
1192    ) -> Result<Vec<Vec<u8>>, ViewContainerError> {
1193        let view = self.view.read().await;
1194        Ok(view.find_keys_by_prefix(key_prefix).await?)
1195    }
1196
1197    async fn find_key_values_by_prefix(
1198        &self,
1199        key_prefix: &[u8],
1200    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ViewContainerError> {
1201        let view = self.view.read().await;
1202        Ok(view.find_key_values_by_prefix(key_prefix).await?)
1203    }
1204}
1205
1206#[cfg(with_testing)]
1207impl<C: Context> WritableKeyValueStore for ViewContainer<C> {
1208    const MAX_VALUE_SIZE: usize = <C::Store as WritableKeyValueStore>::MAX_VALUE_SIZE;
1209
1210    async fn write_batch(&self, batch: Batch) -> Result<(), ViewContainerError> {
1211        let mut view = self.view.write().await;
1212        view.write_batch(batch).await?;
1213        let mut batch = Batch::new();
1214        view.pre_save(&mut batch)?;
1215        view.post_save();
1216        view.context()
1217            .store()
1218            .write_batch(batch)
1219            .await
1220            .map_err(ViewError::from)?;
1221        Ok(())
1222    }
1223
1224    async fn clear_journal(&self) -> Result<(), ViewContainerError> {
1225        Ok(())
1226    }
1227}
1228
1229#[cfg(with_testing)]
1230impl<C: Context> ViewContainer<C> {
1231    /// Creates a [`ViewContainer`].
1232    pub async fn new(context: C) -> Result<Self, ViewError> {
1233        let view = KeyValueStoreView::load(context).await?;
1234        let view = Arc::new(RwLock::new(view));
1235        Ok(Self { view })
1236    }
1237}