Skip to main content

linera_views/
store.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! This provides the trait definitions for the stores.
5
6use std::{fmt::Debug, future::Future};
7
8use serde::{de::DeserializeOwned, Serialize};
9
10#[cfg(with_testing)]
11use crate::random::generate_test_namespace;
12use crate::{
13    batch::{Batch, SimplifiedBatch},
14    common::from_bytes_option,
15    ViewError,
16};
17
18/// The error type for the key-value stores.
19pub trait KeyValueStoreError:
20    std::error::Error + From<bcs::Error> + Debug + Send + Sync + 'static
21{
22    /// The name of the backend.
23    const BACKEND: &'static str;
24
25    /// Returns `true` if this error represents a journal resolution failure,
26    /// which may leave storage in an inconsistent state requiring a view reload.
27    fn must_reload_view(&self) -> bool {
28        false
29    }
30}
31
32impl<E: KeyValueStoreError> From<E> for ViewError {
33    fn from(error: E) -> Self {
34        let must_reload_view = error.must_reload_view();
35        Self::StoreError {
36            backend: E::BACKEND,
37            error: Box::new(error),
38            must_reload_view,
39        }
40    }
41}
42
43/// Define an associated [`KeyValueStoreError`].
44pub trait WithError {
45    /// The error type.
46    type Error: KeyValueStoreError;
47}
48
49/// Asynchronous read key-value operations.
50#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
51pub trait ReadableKeyValueStore: WithError {
52    /// The maximal size of keys that can be stored.
53    const MAX_KEY_SIZE: usize;
54
55    /// Retrieve the number of stream queries.
56    fn max_stream_queries(&self) -> usize;
57
58    /// Gets the root key of the store.
59    fn root_key(&self) -> Result<Vec<u8>, Self::Error>;
60
61    /// Retrieves a `Vec<u8>` from the database using the provided `key`.
62    async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error>;
63
64    /// Tests whether a key exists in the database
65    async fn contains_key(&self, key: &[u8]) -> Result<bool, Self::Error>;
66
67    /// Tests whether a list of keys exist in the database
68    async fn contains_keys(&self, keys: &[Vec<u8>]) -> Result<Vec<bool>, Self::Error>;
69
70    /// Retrieves multiple `Vec<u8>` from the database using the provided `keys`.
71    async fn read_multi_values_bytes(
72        &self,
73        keys: &[Vec<u8>],
74    ) -> Result<Vec<Option<Vec<u8>>>, Self::Error>;
75
76    /// Finds the `key` matching the prefix. The prefix is not included in the returned keys.
77    async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Vec<Vec<u8>>, Self::Error>;
78
79    /// Finds the `(key,value)` pairs matching the prefix. The prefix is not included in the returned keys.
80    async fn find_key_values_by_prefix(
81        &self,
82        key_prefix: &[u8],
83    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, Self::Error>;
84
85    // We can't use `async fn` here in the below implementations due to
86    // https://github.com/rust-lang/impl-trait-utils/issues/17, but once that bug is fixed
87    // we can revert them to `async fn` syntax, which is neater.
88
89    /// Reads a single `key` and deserializes the result if present.
90    fn read_value<V: DeserializeOwned>(
91        &self,
92        key: &[u8],
93    ) -> impl Future<Output = Result<Option<V>, Self::Error>> {
94        async { Ok(from_bytes_option(&self.read_value_bytes(key).await?)?) }
95    }
96
97    /// Reads multiple `keys` and deserializes the results if present.
98    fn read_multi_values<V: DeserializeOwned + Send + Sync>(
99        &self,
100        keys: &[Vec<u8>],
101    ) -> impl Future<Output = Result<Vec<Option<V>>, Self::Error>> {
102        async {
103            let mut values = Vec::with_capacity(keys.len());
104            for entry in self.read_multi_values_bytes(keys).await? {
105                values.push(from_bytes_option(&entry)?);
106            }
107            Ok(values)
108        }
109    }
110}
111
112/// Asynchronous write key-value operations.
113#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
114pub trait WritableKeyValueStore: WithError {
115    /// The maximal size of values that can be stored.
116    const MAX_VALUE_SIZE: usize;
117
118    /// Writes the `batch` in the database.
119    async fn write_batch(&self, batch: Batch) -> Result<(), Self::Error>;
120
121    /// Clears any journal entry that may remain.
122    /// The journal is located at the `root_key`.
123    async fn clear_journal(&self) -> Result<(), Self::Error>;
124}
125
126/// Asynchronous direct write key-value operations with simplified batch.
127///
128/// Some backend cannot implement `WritableKeyValueStore` directly and will require
129/// journaling.
130#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
131pub trait DirectWritableKeyValueStore: WithError {
132    /// The maximal number of items in a batch.
133    const MAX_BATCH_SIZE: usize;
134
135    /// The maximal number of bytes of a batch.
136    const MAX_BATCH_TOTAL_SIZE: usize;
137
138    /// The maximal size of values that can be stored.
139    const MAX_VALUE_SIZE: usize;
140
141    /// The batch type.
142    type Batch: SimplifiedBatch + Serialize + DeserializeOwned + Default;
143
144    /// Writes the batch to the database.
145    async fn write_batch(&self, batch: Self::Batch) -> Result<(), Self::Error>;
146}
147
148/// The definition of a key-value database.
149#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
150pub trait KeyValueDatabase: WithError + linera_base::util::traits::AutoTraits + Sized {
151    /// The configuration needed to interact with a new backend.
152    type Config: Send + Sync;
153
154    /// The result of opening a partition.
155    type Store;
156
157    /// The name of this database.
158    fn get_name() -> String;
159
160    /// Connects to an existing namespace using the given configuration.
161    async fn connect(config: &Self::Config, namespace: &str) -> Result<Self, Self::Error>;
162
163    /// Opens a shared partition starting at `root_key`. It is understood that the
164    /// partition MAY be read and written simultaneously from other clients.
165    fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error>;
166
167    /// Opens an exclusive partition starting at `root_key`. It is assumed that the
168    /// partition WILL NOT be read and written simultaneously by other clients.
169    ///
170    /// IMPORTANT: This assumption is not enforced at the moment. However, future
171    /// implementations may choose to return an error if another client is detected.
172    fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error>;
173
174    /// Obtains the list of existing namespaces.
175    async fn list_all(config: &Self::Config) -> Result<Vec<String>, Self::Error>;
176
177    /// Lists the root keys of the namespace.
178    /// It is possible that some root keys have no keys.
179    async fn list_root_keys(&self) -> Result<Vec<Vec<u8>>, Self::Error>;
180
181    /// Deletes all the existing namespaces.
182    fn delete_all(config: &Self::Config) -> impl Future<Output = Result<(), Self::Error>> {
183        async {
184            let namespaces = Self::list_all(config).await?;
185            for namespace in namespaces {
186                Self::delete(config, &namespace).await?;
187            }
188            Ok(())
189        }
190    }
191
192    /// Tests if a given namespace exists.
193    async fn exists(config: &Self::Config, namespace: &str) -> Result<bool, Self::Error>;
194
195    /// Creates a namespace. Returns an error if the namespace exists.
196    async fn create(config: &Self::Config, namespace: &str) -> Result<(), Self::Error>;
197
198    /// Deletes the given namespace.
199    async fn delete(config: &Self::Config, namespace: &str) -> Result<(), Self::Error>;
200
201    /// Initializes a storage if missing and provides it.
202    fn maybe_create_and_connect(
203        config: &Self::Config,
204        namespace: &str,
205    ) -> impl Future<Output = Result<Self, Self::Error>> {
206        async {
207            if !Self::exists(config, namespace).await? {
208                Self::create(config, namespace).await?;
209            }
210            Self::connect(config, namespace).await
211        }
212    }
213
214    /// Creates a new storage. Overwrites it if this namespace already exists.
215    fn recreate_and_connect(
216        config: &Self::Config,
217        namespace: &str,
218    ) -> impl Future<Output = Result<Self, Self::Error>> {
219        async {
220            if Self::exists(config, namespace).await? {
221                Self::delete(config, namespace).await?;
222            }
223            Self::create(config, namespace).await?;
224            Self::connect(config, namespace).await
225        }
226    }
227}
228
229/// A key-value store that can perform both read and direct write operations.
230///
231/// This trait combines the capabilities of [`ReadableKeyValueStore`] and
232/// [`DirectWritableKeyValueStore`], providing a full interface for stores
233/// that can handle simplified batches directly without journaling.
234pub trait DirectKeyValueStore: ReadableKeyValueStore + DirectWritableKeyValueStore {}
235
236impl<T> DirectKeyValueStore for T where T: ReadableKeyValueStore + DirectWritableKeyValueStore {}
237
238/// A key-value store that can perform both read and write operations.
239///
240/// This trait combines the capabilities of [`ReadableKeyValueStore`] and
241/// [`WritableKeyValueStore`], providing a full interface for stores that
242/// can handle complex batches with journaling support.
243pub trait KeyValueStore: ReadableKeyValueStore + WritableKeyValueStore {}
244
245impl<T> KeyValueStore for T where T: ReadableKeyValueStore + WritableKeyValueStore {}
246
247/// The functions needed for testing purposes
248#[cfg(with_testing)]
249pub trait TestKeyValueDatabase: KeyValueDatabase {
250    /// Obtains a test config
251    async fn new_test_config() -> Result<Self::Config, Self::Error>;
252
253    /// Creates a database for testing purposes
254    async fn connect_test_namespace() -> Result<Self, Self::Error> {
255        let config = Self::new_test_config().await?;
256        let namespace = generate_test_namespace();
257        Self::recreate_and_connect(&config, &namespace).await
258    }
259
260    /// Creates a store for testing purposes
261    async fn new_test_store() -> Result<Self::Store, Self::Error> {
262        let database = Self::connect_test_namespace().await?;
263        database.open_shared(&[])
264    }
265}
266
267/// A module containing a dummy store used for caching views.
268pub mod inactive_store {
269    use super::*;
270
271    /// A store which does not actually store anything - used for caching views.
272    pub struct InactiveStore;
273
274    /// An error struct for the inactive store.
275    #[derive(Clone, Copy, Debug)]
276    pub struct InactiveStoreError;
277
278    impl std::fmt::Display for InactiveStoreError {
279        fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
280            write!(f, "inactive store error")
281        }
282    }
283
284    impl From<bcs::Error> for InactiveStoreError {
285        fn from(_other: bcs::Error) -> Self {
286            Self
287        }
288    }
289
290    impl std::error::Error for InactiveStoreError {}
291
292    impl KeyValueStoreError for InactiveStoreError {
293        const BACKEND: &'static str = "inactive";
294    }
295
296    impl WithError for InactiveStore {
297        type Error = InactiveStoreError;
298    }
299
300    impl ReadableKeyValueStore for InactiveStore {
301        const MAX_KEY_SIZE: usize = 0;
302
303        fn max_stream_queries(&self) -> usize {
304            0
305        }
306
307        fn root_key(&self) -> Result<Vec<u8>, Self::Error> {
308            panic!("attempt to read from an inactive store!")
309        }
310
311        async fn read_value_bytes(&self, _key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
312            panic!("attempt to read from an inactive store!")
313        }
314
315        async fn contains_key(&self, _key: &[u8]) -> Result<bool, Self::Error> {
316            panic!("attempt to read from an inactive store!")
317        }
318
319        async fn contains_keys(&self, _keys: &[Vec<u8>]) -> Result<Vec<bool>, Self::Error> {
320            panic!("attempt to read from an inactive store!")
321        }
322
323        async fn read_multi_values_bytes(
324            &self,
325            _keys: &[Vec<u8>],
326        ) -> Result<Vec<Option<Vec<u8>>>, Self::Error> {
327            panic!("attempt to read from an inactive store!")
328        }
329
330        async fn find_keys_by_prefix(
331            &self,
332            _key_prefix: &[u8],
333        ) -> Result<Vec<Vec<u8>>, Self::Error> {
334            panic!("attempt to read from an inactive store!")
335        }
336
337        /// Finds the `(key,value)` pairs matching the prefix. The prefix is not included in the returned keys.
338        async fn find_key_values_by_prefix(
339            &self,
340            _key_prefix: &[u8],
341        ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, Self::Error> {
342            panic!("attempt to read from an inactive store!")
343        }
344    }
345
346    impl WritableKeyValueStore for InactiveStore {
347        const MAX_VALUE_SIZE: usize = 0;
348
349        async fn write_batch(&self, _batch: Batch) -> Result<(), Self::Error> {
350            panic!("attempt to write to an inactive store!")
351        }
352
353        async fn clear_journal(&self) -> Result<(), Self::Error> {
354            panic!("attempt to write to an inactive store!")
355        }
356    }
357}