1use std::{fmt::Debug, future::Future};
7
8use serde::{de::DeserializeOwned, Serialize};
9
10#[cfg(with_testing)]
11use crate::random::generate_test_namespace;
12use crate::{
13 batch::{Batch, SimplifiedBatch},
14 common::from_bytes_option,
15 ViewError,
16};
17
18pub trait KeyValueStoreError:
20 std::error::Error + From<bcs::Error> + Debug + Send + Sync + 'static
21{
22 const BACKEND: &'static str;
24
25 fn must_reload_view(&self) -> bool {
28 false
29 }
30}
31
32impl<E: KeyValueStoreError> From<E> for ViewError {
33 fn from(error: E) -> Self {
34 let must_reload_view = error.must_reload_view();
35 Self::StoreError {
36 backend: E::BACKEND,
37 error: Box::new(error),
38 must_reload_view,
39 }
40 }
41}
42
43pub trait WithError {
45 type Error: KeyValueStoreError;
47}
48
49#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
51pub trait ReadableKeyValueStore: WithError {
52 const MAX_KEY_SIZE: usize;
54
55 fn max_stream_queries(&self) -> usize;
57
58 fn root_key(&self) -> Result<Vec<u8>, Self::Error>;
60
61 async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error>;
63
64 async fn contains_key(&self, key: &[u8]) -> Result<bool, Self::Error>;
66
67 async fn contains_keys(&self, keys: &[Vec<u8>]) -> Result<Vec<bool>, Self::Error>;
69
70 async fn read_multi_values_bytes(
72 &self,
73 keys: &[Vec<u8>],
74 ) -> Result<Vec<Option<Vec<u8>>>, Self::Error>;
75
76 async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Vec<Vec<u8>>, Self::Error>;
78
79 async fn find_key_values_by_prefix(
81 &self,
82 key_prefix: &[u8],
83 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, Self::Error>;
84
85 fn read_value<V: DeserializeOwned>(
91 &self,
92 key: &[u8],
93 ) -> impl Future<Output = Result<Option<V>, Self::Error>> {
94 async { Ok(from_bytes_option(&self.read_value_bytes(key).await?)?) }
95 }
96
97 fn read_multi_values<V: DeserializeOwned + Send + Sync>(
99 &self,
100 keys: &[Vec<u8>],
101 ) -> impl Future<Output = Result<Vec<Option<V>>, Self::Error>> {
102 async {
103 let mut values = Vec::with_capacity(keys.len());
104 for entry in self.read_multi_values_bytes(keys).await? {
105 values.push(from_bytes_option(&entry)?);
106 }
107 Ok(values)
108 }
109 }
110}
111
112#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
114pub trait WritableKeyValueStore: WithError {
115 const MAX_VALUE_SIZE: usize;
117
118 async fn write_batch(&self, batch: Batch) -> Result<(), Self::Error>;
120
121 async fn clear_journal(&self) -> Result<(), Self::Error>;
124}
125
126#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
131pub trait DirectWritableKeyValueStore: WithError {
132 const MAX_BATCH_SIZE: usize;
134
135 const MAX_BATCH_TOTAL_SIZE: usize;
137
138 const MAX_VALUE_SIZE: usize;
140
141 type Batch: SimplifiedBatch + Serialize + DeserializeOwned + Default;
143
144 async fn write_batch(&self, batch: Self::Batch) -> Result<(), Self::Error>;
146}
147
148#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
150pub trait KeyValueDatabase: WithError + linera_base::util::traits::AutoTraits + Sized {
151 type Config: Send + Sync;
153
154 type Store;
156
157 fn get_name() -> String;
159
160 async fn connect(config: &Self::Config, namespace: &str) -> Result<Self, Self::Error>;
162
163 fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error>;
166
167 fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error>;
173
174 async fn list_all(config: &Self::Config) -> Result<Vec<String>, Self::Error>;
176
177 async fn list_root_keys(&self) -> Result<Vec<Vec<u8>>, Self::Error>;
180
181 fn delete_all(config: &Self::Config) -> impl Future<Output = Result<(), Self::Error>> {
183 async {
184 let namespaces = Self::list_all(config).await?;
185 for namespace in namespaces {
186 Self::delete(config, &namespace).await?;
187 }
188 Ok(())
189 }
190 }
191
192 async fn exists(config: &Self::Config, namespace: &str) -> Result<bool, Self::Error>;
194
195 async fn create(config: &Self::Config, namespace: &str) -> Result<(), Self::Error>;
197
198 async fn delete(config: &Self::Config, namespace: &str) -> Result<(), Self::Error>;
200
201 fn maybe_create_and_connect(
203 config: &Self::Config,
204 namespace: &str,
205 ) -> impl Future<Output = Result<Self, Self::Error>> {
206 async {
207 if !Self::exists(config, namespace).await? {
208 Self::create(config, namespace).await?;
209 }
210 Self::connect(config, namespace).await
211 }
212 }
213
214 fn recreate_and_connect(
216 config: &Self::Config,
217 namespace: &str,
218 ) -> impl Future<Output = Result<Self, Self::Error>> {
219 async {
220 if Self::exists(config, namespace).await? {
221 Self::delete(config, namespace).await?;
222 }
223 Self::create(config, namespace).await?;
224 Self::connect(config, namespace).await
225 }
226 }
227}
228
229pub trait DirectKeyValueStore: ReadableKeyValueStore + DirectWritableKeyValueStore {}
235
236impl<T> DirectKeyValueStore for T where T: ReadableKeyValueStore + DirectWritableKeyValueStore {}
237
238pub trait KeyValueStore: ReadableKeyValueStore + WritableKeyValueStore {}
244
245impl<T> KeyValueStore for T where T: ReadableKeyValueStore + WritableKeyValueStore {}
246
247#[cfg(with_testing)]
249pub trait TestKeyValueDatabase: KeyValueDatabase {
250 async fn new_test_config() -> Result<Self::Config, Self::Error>;
252
253 async fn connect_test_namespace() -> Result<Self, Self::Error> {
255 let config = Self::new_test_config().await?;
256 let namespace = generate_test_namespace();
257 Self::recreate_and_connect(&config, &namespace).await
258 }
259
260 async fn new_test_store() -> Result<Self::Store, Self::Error> {
262 let database = Self::connect_test_namespace().await?;
263 database.open_shared(&[])
264 }
265}
266
267pub mod inactive_store {
269 use super::*;
270
271 pub struct InactiveStore;
273
274 #[derive(Clone, Copy, Debug)]
276 pub struct InactiveStoreError;
277
278 impl std::fmt::Display for InactiveStoreError {
279 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
280 write!(f, "inactive store error")
281 }
282 }
283
284 impl From<bcs::Error> for InactiveStoreError {
285 fn from(_other: bcs::Error) -> Self {
286 Self
287 }
288 }
289
290 impl std::error::Error for InactiveStoreError {}
291
292 impl KeyValueStoreError for InactiveStoreError {
293 const BACKEND: &'static str = "inactive";
294 }
295
296 impl WithError for InactiveStore {
297 type Error = InactiveStoreError;
298 }
299
300 impl ReadableKeyValueStore for InactiveStore {
301 const MAX_KEY_SIZE: usize = 0;
302
303 fn max_stream_queries(&self) -> usize {
304 0
305 }
306
307 fn root_key(&self) -> Result<Vec<u8>, Self::Error> {
308 panic!("attempt to read from an inactive store!")
309 }
310
311 async fn read_value_bytes(&self, _key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
312 panic!("attempt to read from an inactive store!")
313 }
314
315 async fn contains_key(&self, _key: &[u8]) -> Result<bool, Self::Error> {
316 panic!("attempt to read from an inactive store!")
317 }
318
319 async fn contains_keys(&self, _keys: &[Vec<u8>]) -> Result<Vec<bool>, Self::Error> {
320 panic!("attempt to read from an inactive store!")
321 }
322
323 async fn read_multi_values_bytes(
324 &self,
325 _keys: &[Vec<u8>],
326 ) -> Result<Vec<Option<Vec<u8>>>, Self::Error> {
327 panic!("attempt to read from an inactive store!")
328 }
329
330 async fn find_keys_by_prefix(
331 &self,
332 _key_prefix: &[u8],
333 ) -> Result<Vec<Vec<u8>>, Self::Error> {
334 panic!("attempt to read from an inactive store!")
335 }
336
337 async fn find_key_values_by_prefix(
339 &self,
340 _key_prefix: &[u8],
341 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, Self::Error> {
342 panic!("attempt to read from an inactive store!")
343 }
344 }
345
346 impl WritableKeyValueStore for InactiveStore {
347 const MAX_VALUE_SIZE: usize = 0;
348
349 async fn write_batch(&self, _batch: Batch) -> Result<(), Self::Error> {
350 panic!("attempt to write to an inactive store!")
351 }
352
353 async fn clear_journal(&self) -> Result<(), Self::Error> {
354 panic!("attempt to write to an inactive store!")
355 }
356 }
357}