Skip to main content

linera_views/backends/
dynamo_db.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Implements [`crate::store::KeyValueStore`] for the DynamoDB database.
5
6use std::{
7    collections::HashMap,
8    env,
9    sync::{
10        atomic::{AtomicBool, Ordering},
11        Arc,
12    },
13};
14
15use async_lock::{Semaphore, SemaphoreGuard};
16use aws_sdk_dynamodb::{
17    error::SdkError,
18    operation::{
19        create_table::CreateTableError,
20        delete_table::DeleteTableError,
21        get_item::GetItemError,
22        list_tables::ListTablesError,
23        query::{QueryError, QueryOutput},
24        transact_write_items::TransactWriteItemsError,
25    },
26    primitives::Blob,
27    types::{
28        AttributeDefinition, AttributeValue, Delete, KeySchemaElement, KeyType,
29        ProvisionedThroughput, Put, ScalarAttributeType, TransactWriteItem,
30    },
31    Client,
32};
33use aws_smithy_types::error::operation::BuildError;
34use futures::future::join_all;
35use linera_base::{ensure, util::future::FutureSyncExt as _};
36use serde::{Deserialize, Serialize};
37use thiserror::Error;
38
39#[cfg(with_metrics)]
40use crate::metering::MeteredDatabase;
41#[cfg(with_testing)]
42use crate::store::TestKeyValueDatabase;
43use crate::{
44    batch::SimpleUnorderedBatch,
45    common::get_uleb128_size,
46    journaling::{JournalConsistencyError, JournalingKeyValueDatabase},
47    lru_caching::{LruCachingConfig, LruCachingDatabase},
48    store::{
49        DirectWritableKeyValueStore, KeyValueDatabase, KeyValueStoreError, ReadableKeyValueStore,
50        WithError,
51    },
52    value_splitting::{ValueSplittingDatabase, ValueSplittingError},
53};
54
55/// Name of the environment variable with the address to a DynamoDB local instance.
56const DYNAMODB_LOCAL_ENDPOINT: &str = "DYNAMODB_LOCAL_ENDPOINT";
57
58/// Gets the AWS configuration from the environment
59async fn get_base_config() -> Result<aws_sdk_dynamodb::Config, DynamoDbStoreInternalError> {
60    let base_config = aws_config::load_defaults(aws_config::BehaviorVersion::latest())
61        .boxed_sync()
62        .await;
63    Ok((&base_config).into())
64}
65
66fn get_endpoint_address() -> Option<String> {
67    env::var(DYNAMODB_LOCAL_ENDPOINT).ok()
68}
69
70/// Gets the DynamoDB local config
71async fn get_dynamodb_local_config() -> Result<aws_sdk_dynamodb::Config, DynamoDbStoreInternalError>
72{
73    let base_config = aws_config::load_defaults(aws_config::BehaviorVersion::latest())
74        .boxed_sync()
75        .await;
76    let endpoint_address = get_endpoint_address().unwrap();
77    let config = aws_sdk_dynamodb::config::Builder::from(&base_config)
78        .endpoint_url(endpoint_address)
79        .build();
80    Ok(config)
81}
82
83/// DynamoDB forbids the iteration over the partition keys.
84/// Therefore we use a special partition key named `[1]` for storing
85/// the root keys. For normal root keys, we simply put a `[0]` in
86/// front therefore no intersection is possible.
87const PARTITION_KEY_ROOT_KEY: &[u8] = &[1];
88
89/// The attribute name of the partition key.
90const PARTITION_ATTRIBUTE: &str = "item_partition";
91
92/// A root key being used for testing existence of tables
93const EMPTY_ROOT_KEY: &[u8] = &[0];
94
95/// A key being used for testing existence of tables
96const DB_KEY: &[u8] = &[0];
97
98/// The attribute name of the primary key (used as a sort key).
99const KEY_ATTRIBUTE: &str = "item_key";
100
101/// The attribute name of the table value blob.
102const VALUE_ATTRIBUTE: &str = "item_value";
103
104/// The attribute for obtaining the primary key (used as a sort key) with the stored value.
105const KEY_VALUE_ATTRIBUTE: &str = "item_key, item_value";
106
107/// TODO(#1084): The scheme below with the `MAX_VALUE_SIZE` has to be checked
108/// This is the maximum size of a raw value in DynamoDB.
109const RAW_MAX_VALUE_SIZE: usize = 409600;
110
111/// Fundamental constants in DynamoDB: The maximum size of a value is 400 KB
112/// See https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ServiceQuotas.html
113/// However, the value being written can also be the serialization of a `SimpleUnorderedBatch`
114/// Therefore the actual `MAX_VALUE_SIZE` might be lower.
115/// At the maximum key size is 1024 bytes (see below) and we pack just one entry.
116/// So if the key has 1024 bytes this gets us the inequality
117/// `1 + 1 + serialized_size(1024)? + serialized_size(x)? <= 400*1024`
118/// and so this simplifies to `1 + 1 + (2 + 1024) + (3 + x) <= 400 * 1024`
119/// Note on the following formula:
120/// * We write 3 because `get_uleb128_size(400*1024) == 3`
121/// * We write `1 + 1` because the `SimpleUnorderedBatch` has two entries
122///
123/// This gets us a maximal value of 408569;
124const VISIBLE_MAX_VALUE_SIZE: usize = RAW_MAX_VALUE_SIZE
125    - MAX_KEY_SIZE
126    - get_uleb128_size(RAW_MAX_VALUE_SIZE)
127    - get_uleb128_size(MAX_KEY_SIZE)
128    - 1
129    - 1;
130
131/// Fundamental constant in DynamoDB: The maximum size of a key is 1024 bytes
132/// See https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.NamingRulesDataTypes.html
133const MAX_KEY_SIZE: usize = 1024;
134
135/// Fundamental constants in DynamoDB: The maximum size of a [`TransactWriteItem`] is 4 MB.
136/// See https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_TransactWriteItems.html
137/// We're taking a conservative value because the mode of computation is unclear.
138const MAX_TRANSACT_WRITE_ITEM_TOTAL_SIZE: usize = 4000000;
139
140/// The DynamoDB database is potentially handling an infinite number of connections.
141/// However, for testing or some other purpose we really need to decrease the number of
142/// connections.
143#[cfg(with_testing)]
144const TEST_DYNAMO_DB_MAX_CONCURRENT_QUERIES: usize = 10;
145
146/// The number of entries in a stream of the tests can be controlled by this parameter for tests.
147#[cfg(with_testing)]
148const TEST_DYNAMO_DB_MAX_STREAM_QUERIES: usize = 10;
149
150/// Fundamental constants in DynamoDB: The maximum size of a [`TransactWriteItem`] is 100.
151/// See <https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_TransactWriteItems.html>
152const MAX_TRANSACT_WRITE_ITEM_SIZE: usize = 100;
153
154/// Builds the key attributes for a table item.
155///
156/// The key is composed of two attributes that are both binary blobs. The first attribute is a
157/// partition key and is currently just a dummy value that ensures all items are in the same
158/// partition. This is necessary for range queries to work correctly.
159///
160/// The second attribute is the actual key value, which is generated by concatenating the
161/// context prefix. `The Vec<u8>` expression is obtained from `self.derive_key`.
162fn build_key(start_key: &[u8], key: Vec<u8>) -> HashMap<String, AttributeValue> {
163    [
164        (
165            PARTITION_ATTRIBUTE.to_owned(),
166            AttributeValue::B(Blob::new(start_key.to_vec())),
167        ),
168        (KEY_ATTRIBUTE.to_owned(), AttributeValue::B(Blob::new(key))),
169    ]
170    .into()
171}
172
173/// Builds the value attribute for storing a table item.
174fn build_key_value(
175    start_key: &[u8],
176    key: Vec<u8>,
177    value: Vec<u8>,
178) -> HashMap<String, AttributeValue> {
179    [
180        (
181            PARTITION_ATTRIBUTE.to_owned(),
182            AttributeValue::B(Blob::new(start_key.to_vec())),
183        ),
184        (KEY_ATTRIBUTE.to_owned(), AttributeValue::B(Blob::new(key))),
185        (
186            VALUE_ATTRIBUTE.to_owned(),
187            AttributeValue::B(Blob::new(value)),
188        ),
189    ]
190    .into()
191}
192
193/// Checks that a key is of the correct size
194fn check_key_size(key: &[u8]) -> Result<(), DynamoDbStoreInternalError> {
195    ensure!(!key.is_empty(), DynamoDbStoreInternalError::ZeroLengthKey);
196    ensure!(
197        key.len() <= MAX_KEY_SIZE,
198        DynamoDbStoreInternalError::KeyTooLong
199    );
200    Ok(())
201}
202
203/// Extracts the key attribute from an item.
204fn extract_key(
205    prefix_len: usize,
206    attributes: &HashMap<String, AttributeValue>,
207) -> Result<&[u8], DynamoDbStoreInternalError> {
208    let key = attributes
209        .get(KEY_ATTRIBUTE)
210        .ok_or(DynamoDbStoreInternalError::MissingKey)?;
211    match key {
212        AttributeValue::B(blob) => Ok(&blob.as_ref()[prefix_len..]),
213        key => Err(DynamoDbStoreInternalError::wrong_key_type(key)),
214    }
215}
216
217/// Extracts the value attribute from an item.
218fn extract_value(
219    attributes: &HashMap<String, AttributeValue>,
220) -> Result<&[u8], DynamoDbStoreInternalError> {
221    // According to the official AWS DynamoDB documentation:
222    // "Binary must have a length greater than zero if the attribute is used as a key attribute for a table or index"
223    let value = attributes
224        .get(VALUE_ATTRIBUTE)
225        .ok_or(DynamoDbStoreInternalError::MissingValue)?;
226    match value {
227        AttributeValue::B(blob) => Ok(blob.as_ref()),
228        value => Err(DynamoDbStoreInternalError::wrong_value_type(value)),
229    }
230}
231
232/// Extracts the value attribute from an item (returned by value).
233fn extract_value_owned(
234    attributes: &mut HashMap<String, AttributeValue>,
235) -> Result<Vec<u8>, DynamoDbStoreInternalError> {
236    let value = attributes
237        .remove(VALUE_ATTRIBUTE)
238        .ok_or(DynamoDbStoreInternalError::MissingValue)?;
239    match value {
240        AttributeValue::B(blob) => Ok(blob.into_inner()),
241        value => Err(DynamoDbStoreInternalError::wrong_value_type(&value)),
242    }
243}
244
245/// Extracts the key and value attributes from an item.
246fn extract_key_value(
247    prefix_len: usize,
248    attributes: &HashMap<String, AttributeValue>,
249) -> Result<(&[u8], &[u8]), DynamoDbStoreInternalError> {
250    let key = extract_key(prefix_len, attributes)?;
251    let value = extract_value(attributes)?;
252    Ok((key, value))
253}
254
255struct TransactionBuilder {
256    start_key: Vec<u8>,
257    transactions: Vec<TransactWriteItem>,
258}
259
260impl TransactionBuilder {
261    fn new(start_key: &[u8]) -> Self {
262        Self {
263            start_key: start_key.to_vec(),
264            transactions: Vec::new(),
265        }
266    }
267
268    fn insert_delete_request(
269        &mut self,
270        key: Vec<u8>,
271        store: &DynamoDbStoreInternal,
272    ) -> Result<(), DynamoDbStoreInternalError> {
273        let transaction = store.build_delete_transaction(&self.start_key, key)?;
274        self.transactions.push(transaction);
275        Ok(())
276    }
277
278    fn insert_put_request(
279        &mut self,
280        key: Vec<u8>,
281        value: Vec<u8>,
282        store: &DynamoDbStoreInternal,
283    ) -> Result<(), DynamoDbStoreInternalError> {
284        let transaction = store.build_put_transaction(&self.start_key, key, value)?;
285        self.transactions.push(transaction);
286        Ok(())
287    }
288}
289
290/// A DynamoDB client.
291#[derive(Clone, Debug)]
292pub struct DynamoDbStoreInternal {
293    client: Client,
294    namespace: String,
295    semaphore: Option<Arc<Semaphore>>,
296    max_stream_queries: usize,
297    start_key: Vec<u8>,
298    root_key_written: Arc<AtomicBool>,
299}
300
301/// Database-level connection to DynamoDB for managing namespaces and partitions.
302#[derive(Clone)]
303pub struct DynamoDbDatabaseInternal {
304    client: Client,
305    namespace: String,
306    semaphore: Option<Arc<Semaphore>>,
307    max_stream_queries: usize,
308}
309
310impl WithError for DynamoDbDatabaseInternal {
311    type Error = DynamoDbStoreInternalError;
312}
313
314/// The initial configuration of the system.
315#[derive(Debug, Clone, Serialize, Deserialize)]
316pub struct DynamoDbStoreInternalConfig {
317    /// Whether to use DynamoDB local or not.
318    pub use_dynamodb_local: bool,
319    /// Maximum number of concurrent database queries allowed for this client.
320    pub max_concurrent_queries: Option<usize>,
321    /// Preferred buffer size for async streams.
322    pub max_stream_queries: usize,
323}
324
325impl DynamoDbStoreInternalConfig {
326    async fn client(&self) -> Result<Client, DynamoDbStoreInternalError> {
327        let config = if self.use_dynamodb_local {
328            get_dynamodb_local_config().await?
329        } else {
330            get_base_config().await?
331        };
332        Ok(Client::from_conf(config))
333    }
334}
335
336impl KeyValueDatabase for DynamoDbDatabaseInternal {
337    type Config = DynamoDbStoreInternalConfig;
338    type Store = DynamoDbStoreInternal;
339
340    fn get_name() -> String {
341        "dynamodb internal".to_string()
342    }
343
344    async fn connect(
345        config: &Self::Config,
346        namespace: &str,
347    ) -> Result<Self, DynamoDbStoreInternalError> {
348        Self::check_namespace(namespace)?;
349        let client = config.client().await?;
350        let semaphore = config
351            .max_concurrent_queries
352            .map(|n| Arc::new(Semaphore::new(n)));
353        let max_stream_queries = config.max_stream_queries;
354        let namespace = namespace.to_string();
355        let store = Self {
356            client,
357            namespace,
358            semaphore,
359            max_stream_queries,
360        };
361        Ok(store)
362    }
363
364    fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, DynamoDbStoreInternalError> {
365        let mut start_key = EMPTY_ROOT_KEY.to_vec();
366        start_key.extend(root_key);
367        Ok(self.open_internal(start_key))
368    }
369
370    fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, DynamoDbStoreInternalError> {
371        self.open_shared(root_key)
372    }
373
374    async fn list_all(config: &Self::Config) -> Result<Vec<String>, DynamoDbStoreInternalError> {
375        let client = config.client().await?;
376        let mut namespaces = Vec::new();
377        let mut start_table = None;
378        loop {
379            let response = client
380                .list_tables()
381                .set_exclusive_start_table_name(start_table)
382                .send()
383                .boxed_sync()
384                .await?;
385            if let Some(namespaces_blk) = response.table_names {
386                namespaces.extend(namespaces_blk);
387            }
388            if response.last_evaluated_table_name.is_none() {
389                break;
390            } else {
391                start_table = response.last_evaluated_table_name;
392            }
393        }
394        Ok(namespaces)
395    }
396
397    async fn list_root_keys(&self) -> Result<Vec<Vec<u8>>, DynamoDbStoreInternalError> {
398        let store = self.open_internal(PARTITION_KEY_ROOT_KEY.to_vec());
399        store.find_keys_by_prefix(EMPTY_ROOT_KEY).await
400    }
401
402    async fn delete_all(config: &Self::Config) -> Result<(), DynamoDbStoreInternalError> {
403        let client = config.client().await?;
404        let tables = Self::list_all(config).await?;
405        for table in tables {
406            client
407                .delete_table()
408                .table_name(&table)
409                .send()
410                .boxed_sync()
411                .await?;
412        }
413        Ok(())
414    }
415
416    async fn exists(
417        config: &Self::Config,
418        namespace: &str,
419    ) -> Result<bool, DynamoDbStoreInternalError> {
420        Self::check_namespace(namespace)?;
421        let client = config.client().await?;
422        let key_db = build_key(EMPTY_ROOT_KEY, DB_KEY.to_vec());
423        let response = client
424            .get_item()
425            .table_name(namespace)
426            .set_key(Some(key_db))
427            .send()
428            .boxed_sync()
429            .await;
430        let Err(error) = response else {
431            return Ok(true);
432        };
433        let test = match &error {
434            SdkError::ServiceError(error) => match error.err() {
435                GetItemError::ResourceNotFoundException(error) => {
436                    error.message
437                        == Some("Cannot do operations on a non-existent table".to_string())
438                }
439                _ => false,
440            },
441            _ => false,
442        };
443        if test {
444            Ok(false)
445        } else {
446            Err(error.into())
447        }
448    }
449
450    async fn create(
451        config: &Self::Config,
452        namespace: &str,
453    ) -> Result<(), DynamoDbStoreInternalError> {
454        Self::check_namespace(namespace)?;
455        let client = config.client().await?;
456        client
457            .create_table()
458            .table_name(namespace)
459            .attribute_definitions(
460                AttributeDefinition::builder()
461                    .attribute_name(PARTITION_ATTRIBUTE)
462                    .attribute_type(ScalarAttributeType::B)
463                    .build()?,
464            )
465            .attribute_definitions(
466                AttributeDefinition::builder()
467                    .attribute_name(KEY_ATTRIBUTE)
468                    .attribute_type(ScalarAttributeType::B)
469                    .build()?,
470            )
471            .key_schema(
472                KeySchemaElement::builder()
473                    .attribute_name(PARTITION_ATTRIBUTE)
474                    .key_type(KeyType::Hash)
475                    .build()?,
476            )
477            .key_schema(
478                KeySchemaElement::builder()
479                    .attribute_name(KEY_ATTRIBUTE)
480                    .key_type(KeyType::Range)
481                    .build()?,
482            )
483            .provisioned_throughput(
484                ProvisionedThroughput::builder()
485                    .read_capacity_units(10)
486                    .write_capacity_units(10)
487                    .build()?,
488            )
489            .send()
490            .boxed_sync()
491            .await?;
492        Ok(())
493    }
494
495    async fn delete(
496        config: &Self::Config,
497        namespace: &str,
498    ) -> Result<(), DynamoDbStoreInternalError> {
499        Self::check_namespace(namespace)?;
500        let client = config.client().await?;
501        client
502            .delete_table()
503            .table_name(namespace)
504            .send()
505            .boxed_sync()
506            .await?;
507        Ok(())
508    }
509}
510
511impl DynamoDbDatabaseInternal {
512    /// Namespaces are named table names in DynamoDB [naming
513    /// rules](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.NamingRulesDataTypes.html#HowItWorks.NamingRules),
514    /// so we need to check correctness of the namespace
515    fn check_namespace(namespace: &str) -> Result<(), InvalidNamespace> {
516        if namespace.len() < 3 {
517            return Err(InvalidNamespace::TooShort);
518        }
519        if namespace.len() > 255 {
520            return Err(InvalidNamespace::TooLong);
521        }
522        if !namespace.chars().all(|character| {
523            character.is_ascii_alphanumeric()
524                || character == '.'
525                || character == '-'
526                || character == '_'
527        }) {
528            return Err(InvalidNamespace::InvalidCharacter);
529        }
530        Ok(())
531    }
532
533    fn open_internal(&self, start_key: Vec<u8>) -> DynamoDbStoreInternal {
534        let client = self.client.clone();
535        let namespace = self.namespace.clone();
536        let semaphore = self.semaphore.clone();
537        let max_stream_queries = self.max_stream_queries;
538        DynamoDbStoreInternal {
539            client,
540            namespace,
541            semaphore,
542            max_stream_queries,
543            start_key,
544            root_key_written: Arc::new(AtomicBool::new(false)),
545        }
546    }
547}
548
549impl DynamoDbStoreInternal {
550    fn build_delete_transaction(
551        &self,
552        start_key: &[u8],
553        key: Vec<u8>,
554    ) -> Result<TransactWriteItem, DynamoDbStoreInternalError> {
555        check_key_size(&key)?;
556        let request = Delete::builder()
557            .table_name(&self.namespace)
558            .set_key(Some(build_key(start_key, key)))
559            .build()?;
560        Ok(TransactWriteItem::builder().delete(request).build())
561    }
562
563    fn build_put_transaction(
564        &self,
565        start_key: &[u8],
566        key: Vec<u8>,
567        value: Vec<u8>,
568    ) -> Result<TransactWriteItem, DynamoDbStoreInternalError> {
569        check_key_size(&key)?;
570        ensure!(
571            value.len() <= RAW_MAX_VALUE_SIZE,
572            DynamoDbStoreInternalError::ValueLengthTooLarge
573        );
574        let request = Put::builder()
575            .table_name(&self.namespace)
576            .set_item(Some(build_key_value(start_key, key, value)))
577            .build()?;
578        Ok(TransactWriteItem::builder().put(request).build())
579    }
580
581    /// Obtains the semaphore lock on the database if needed.
582    async fn acquire(&self) -> Option<SemaphoreGuard<'_>> {
583        match &self.semaphore {
584            None => None,
585            Some(count) => Some(count.acquire().await),
586        }
587    }
588
589    async fn get_query_output(
590        &self,
591        attribute_str: &str,
592        start_key: &[u8],
593        key_prefix: &[u8],
594        start_key_map: Option<HashMap<String, AttributeValue>>,
595    ) -> Result<QueryOutput, DynamoDbStoreInternalError> {
596        let _guard = self.acquire().await;
597        let start_key = start_key.to_vec();
598        let response = self
599            .client
600            .query()
601            .table_name(&self.namespace)
602            .projection_expression(attribute_str)
603            .key_condition_expression(format!(
604                "{PARTITION_ATTRIBUTE} = :partition and begins_with({KEY_ATTRIBUTE}, :prefix)"
605            ))
606            .expression_attribute_values(":partition", AttributeValue::B(Blob::new(start_key)))
607            .expression_attribute_values(":prefix", AttributeValue::B(Blob::new(key_prefix)))
608            .set_exclusive_start_key(start_key_map)
609            .send()
610            .boxed_sync()
611            .await?;
612        Ok(response)
613    }
614
615    async fn read_value_bytes_general(
616        &self,
617        key_db: HashMap<String, AttributeValue>,
618    ) -> Result<Option<Vec<u8>>, DynamoDbStoreInternalError> {
619        let _guard = self.acquire().await;
620        let response = self
621            .client
622            .get_item()
623            .table_name(&self.namespace)
624            .set_key(Some(key_db))
625            .send()
626            .boxed_sync()
627            .await?;
628
629        match response.item {
630            Some(mut item) => {
631                let value = extract_value_owned(&mut item)?;
632                Ok(Some(value))
633            }
634            None => Ok(None),
635        }
636    }
637
638    async fn contains_key_general(
639        &self,
640        key_db: HashMap<String, AttributeValue>,
641    ) -> Result<bool, DynamoDbStoreInternalError> {
642        let _guard = self.acquire().await;
643        let response = self
644            .client
645            .get_item()
646            .table_name(&self.namespace)
647            .set_key(Some(key_db))
648            .projection_expression(PARTITION_ATTRIBUTE)
649            .send()
650            .boxed_sync()
651            .await?;
652
653        Ok(response.item.is_some())
654    }
655
656    async fn get_list_responses(
657        &self,
658        attribute: &str,
659        start_key: &[u8],
660        key_prefix: &[u8],
661    ) -> Result<QueryResponses, DynamoDbStoreInternalError> {
662        check_key_size(key_prefix)?;
663        let mut responses = Vec::new();
664        let mut start_key_map = None;
665        loop {
666            let response = self
667                .get_query_output(attribute, start_key, key_prefix, start_key_map)
668                .await?;
669            let last_evaluated = response.last_evaluated_key.clone();
670            responses.push(response);
671            match last_evaluated {
672                None => {
673                    break;
674                }
675                Some(value) => {
676                    start_key_map = Some(value);
677                }
678            }
679        }
680        Ok(QueryResponses {
681            prefix_len: key_prefix.len(),
682            responses,
683        })
684    }
685}
686
687struct QueryResponses {
688    prefix_len: usize,
689    responses: Vec<QueryOutput>,
690}
691
692impl QueryResponses {
693    fn keys(&self) -> impl Iterator<Item = Result<&[u8], DynamoDbStoreInternalError>> {
694        self.responses
695            .iter()
696            .flat_map(|response| response.items.iter().flatten())
697            .map(|item| extract_key(self.prefix_len, item))
698    }
699
700    fn key_values(
701        &self,
702    ) -> impl Iterator<Item = Result<(&[u8], &[u8]), DynamoDbStoreInternalError>> {
703        self.responses
704            .iter()
705            .flat_map(|response| response.items.iter().flatten())
706            .map(|item| extract_key_value(self.prefix_len, item))
707    }
708}
709
710impl WithError for DynamoDbStoreInternal {
711    type Error = DynamoDbStoreInternalError;
712}
713
714impl ReadableKeyValueStore for DynamoDbStoreInternal {
715    const MAX_KEY_SIZE: usize = MAX_KEY_SIZE;
716
717    fn max_stream_queries(&self) -> usize {
718        self.max_stream_queries
719    }
720
721    fn root_key(&self) -> Result<Vec<u8>, DynamoDbStoreInternalError> {
722        assert!(self.start_key.starts_with(EMPTY_ROOT_KEY));
723        Ok(self.start_key[EMPTY_ROOT_KEY.len()..].to_vec())
724    }
725
726    async fn read_value_bytes(
727        &self,
728        key: &[u8],
729    ) -> Result<Option<Vec<u8>>, DynamoDbStoreInternalError> {
730        check_key_size(key)?;
731        let key_db = build_key(&self.start_key, key.to_vec());
732        self.read_value_bytes_general(key_db).await
733    }
734
735    async fn contains_key(&self, key: &[u8]) -> Result<bool, DynamoDbStoreInternalError> {
736        check_key_size(key)?;
737        let key_db = build_key(&self.start_key, key.to_vec());
738        self.contains_key_general(key_db).await
739    }
740
741    async fn contains_keys(
742        &self,
743        keys: &[Vec<u8>],
744    ) -> Result<Vec<bool>, DynamoDbStoreInternalError> {
745        let mut handles = Vec::new();
746        for key in keys {
747            check_key_size(key)?;
748            let key_db = build_key(&self.start_key, key.clone());
749            let handle = self.contains_key_general(key_db);
750            handles.push(handle);
751        }
752        join_all(handles)
753            .await
754            .into_iter()
755            .collect::<Result<_, _>>()
756    }
757
758    async fn read_multi_values_bytes(
759        &self,
760        keys: &[Vec<u8>],
761    ) -> Result<Vec<Option<Vec<u8>>>, DynamoDbStoreInternalError> {
762        let mut handles = Vec::new();
763        for key in keys {
764            check_key_size(key)?;
765            let key_db = build_key(&self.start_key, key.to_vec());
766            let handle = self.read_value_bytes_general(key_db);
767            handles.push(handle);
768        }
769        join_all(handles)
770            .await
771            .into_iter()
772            .collect::<Result<_, _>>()
773    }
774
775    async fn find_keys_by_prefix(
776        &self,
777        key_prefix: &[u8],
778    ) -> Result<Vec<Vec<u8>>, DynamoDbStoreInternalError> {
779        let result_queries = self
780            .get_list_responses(KEY_ATTRIBUTE, &self.start_key, key_prefix)
781            .await?;
782        result_queries
783            .keys()
784            .map(|key| key.map(|k| k.to_vec()))
785            .collect()
786    }
787
788    async fn find_key_values_by_prefix(
789        &self,
790        key_prefix: &[u8],
791    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, DynamoDbStoreInternalError> {
792        let result_queries = self
793            .get_list_responses(KEY_VALUE_ATTRIBUTE, &self.start_key, key_prefix)
794            .await?;
795        result_queries
796            .key_values()
797            .map(|entry| entry.map(|(key, value)| (key.to_vec(), value.to_vec())))
798            .collect()
799    }
800}
801
802impl DirectWritableKeyValueStore for DynamoDbStoreInternal {
803    const MAX_BATCH_SIZE: usize = MAX_TRANSACT_WRITE_ITEM_SIZE;
804    const MAX_BATCH_TOTAL_SIZE: usize = MAX_TRANSACT_WRITE_ITEM_TOTAL_SIZE;
805    const MAX_VALUE_SIZE: usize = VISIBLE_MAX_VALUE_SIZE;
806
807    // DynamoDB does not support the `DeletePrefix` operation.
808    type Batch = SimpleUnorderedBatch;
809
810    async fn write_batch(&self, batch: Self::Batch) -> Result<(), DynamoDbStoreInternalError> {
811        if !self.root_key_written.fetch_or(true, Ordering::SeqCst) {
812            let mut builder = TransactionBuilder::new(PARTITION_KEY_ROOT_KEY);
813            builder.insert_put_request(self.start_key.clone(), vec![], self)?;
814            self.client
815                .transact_write_items()
816                .set_transact_items(Some(builder.transactions))
817                .send()
818                .boxed_sync()
819                .await?;
820        }
821        let mut builder = TransactionBuilder::new(&self.start_key);
822        for key in batch.deletions {
823            builder.insert_delete_request(key, self)?;
824        }
825        for (key, value) in batch.insertions {
826            builder.insert_put_request(key, value, self)?;
827        }
828        if !builder.transactions.is_empty() {
829            let _guard = self.acquire().await;
830            self.client
831                .transact_write_items()
832                .set_transact_items(Some(builder.transactions))
833                .send()
834                .boxed_sync()
835                .await?;
836        }
837        Ok(())
838    }
839}
840
841/// Error when validating a namespace
842#[derive(Debug, Error)]
843pub enum InvalidNamespace {
844    /// The namespace should be at least 3 characters.
845    #[error("Namespace must have at least 3 characters")]
846    TooShort,
847
848    /// The namespace should be at most 63 characters.
849    #[error("Namespace must be at most 63 characters")]
850    TooLong,
851
852    /// allowed characters are lowercase letters, numbers, periods and hyphens
853    #[error("Namespace must only contain lowercase letters, numbers, periods and hyphens")]
854    InvalidCharacter,
855}
856
857/// Errors that occur when using [`DynamoDbStoreInternal`].
858#[derive(Debug, Error)]
859pub enum DynamoDbStoreInternalError {
860    /// An error occurred while getting the item.
861    #[error(transparent)]
862    Get(#[from] Box<SdkError<GetItemError>>),
863
864    /// An error occurred while writing a transaction of items.
865    #[error(transparent)]
866    TransactWriteItem(#[from] Box<SdkError<TransactWriteItemsError>>),
867
868    /// An error occurred while doing a Query.
869    #[error(transparent)]
870    Query(#[from] Box<SdkError<QueryError>>),
871
872    /// An error occurred while deleting a table
873    #[error(transparent)]
874    DeleteTable(#[from] Box<SdkError<DeleteTableError>>),
875
876    /// An error occurred while listing tables
877    #[error(transparent)]
878    ListTables(#[from] Box<SdkError<ListTablesError>>),
879
880    /// The transact maximum size is `MAX_TRANSACT_WRITE_ITEM_SIZE`.
881    #[error("The transact must have length at most MAX_TRANSACT_WRITE_ITEM_SIZE")]
882    TransactUpperLimitSize,
883
884    /// Keys have to be of non-zero length.
885    #[error("The key must be of strictly positive length")]
886    ZeroLengthKey,
887
888    /// The key must have at most 1024 bytes
889    #[error("The key must have at most 1024 bytes")]
890    KeyTooLong,
891
892    /// The key prefix must have at most 1024 bytes
893    #[error("The key prefix must have at most 1024 bytes")]
894    KeyPrefixTooLong,
895
896    /// Key prefixes have to be of non-zero length.
897    #[error("The key_prefix must be of strictly positive length")]
898    ZeroLengthKeyPrefix,
899
900    /// The journal is not coherent
901    #[error(transparent)]
902    JournalConsistencyError(#[from] JournalConsistencyError),
903
904    /// The length of the value should be at most 400 KB.
905    #[error("The DynamoDB value should be less than 400 KB")]
906    ValueLengthTooLarge,
907
908    /// The stored key is missing.
909    #[error("The stored key attribute is missing")]
910    MissingKey,
911
912    /// The type of the keys was not correct (It should have been a binary blob).
913    #[error("Key was stored as {0}, but it was expected to be stored as a binary blob")]
914    WrongKeyType(String),
915
916    /// The value attribute is missing.
917    #[error("The stored value attribute is missing")]
918    MissingValue,
919
920    /// The value was stored as the wrong type (it should be a binary blob).
921    #[error("Value was stored as {0}, but it was expected to be stored as a binary blob")]
922    WrongValueType(String),
923
924    /// A BCS error occurred.
925    #[error(transparent)]
926    BcsError(#[from] bcs::Error),
927
928    /// A wrong namespace error occurred
929    #[error(transparent)]
930    InvalidNamespace(#[from] InvalidNamespace),
931
932    /// An error occurred while creating the table.
933    #[error(transparent)]
934    CreateTable(#[from] Box<SdkError<CreateTableError>>),
935
936    /// An error occurred while building an object
937    #[error(transparent)]
938    Build(#[from] Box<BuildError>),
939}
940
941impl<InnerError> From<SdkError<InnerError>> for DynamoDbStoreInternalError
942where
943    DynamoDbStoreInternalError: From<Box<SdkError<InnerError>>>,
944{
945    fn from(error: SdkError<InnerError>) -> Self {
946        Box::new(error).into()
947    }
948}
949
950impl From<BuildError> for DynamoDbStoreInternalError {
951    fn from(error: BuildError) -> Self {
952        Box::new(error).into()
953    }
954}
955
956impl DynamoDbStoreInternalError {
957    /// Creates a [`DynamoDbStoreInternalError::WrongKeyType`] instance based on the returned value type.
958    ///
959    /// # Panics
960    ///
961    /// If the value type is in the correct type, a binary blob.
962    pub fn wrong_key_type(value: &AttributeValue) -> Self {
963        DynamoDbStoreInternalError::WrongKeyType(Self::type_description_of(value))
964    }
965
966    /// Creates a [`DynamoDbStoreInternalError::WrongValueType`] instance based on the returned value type.
967    ///
968    /// # Panics
969    ///
970    /// If the value type is in the correct type, a binary blob.
971    pub fn wrong_value_type(value: &AttributeValue) -> Self {
972        DynamoDbStoreInternalError::WrongValueType(Self::type_description_of(value))
973    }
974
975    fn type_description_of(value: &AttributeValue) -> String {
976        match value {
977            AttributeValue::B(_) => unreachable!("creating an error type for the correct type"),
978            AttributeValue::Bool(_) => "a boolean",
979            AttributeValue::Bs(_) => "a list of binary blobs",
980            AttributeValue::L(_) => "a list",
981            AttributeValue::M(_) => "a map",
982            AttributeValue::N(_) => "a number",
983            AttributeValue::Ns(_) => "a list of numbers",
984            AttributeValue::Null(_) => "a null value",
985            AttributeValue::S(_) => "a string",
986            AttributeValue::Ss(_) => "a list of strings",
987            _ => "an unknown type",
988        }
989        .to_owned()
990    }
991}
992
993impl KeyValueStoreError for DynamoDbStoreInternalError {
994    const BACKEND: &'static str = "dynamo_db";
995}
996
997#[cfg(with_testing)]
998impl TestKeyValueDatabase for JournalingKeyValueDatabase<DynamoDbDatabaseInternal> {
999    async fn new_test_config() -> Result<DynamoDbStoreInternalConfig, DynamoDbStoreInternalError> {
1000        Ok(DynamoDbStoreInternalConfig {
1001            use_dynamodb_local: true,
1002            max_concurrent_queries: Some(TEST_DYNAMO_DB_MAX_CONCURRENT_QUERIES),
1003            max_stream_queries: TEST_DYNAMO_DB_MAX_STREAM_QUERIES,
1004        })
1005    }
1006}
1007
1008/// The combined error type for [`DynamoDbDatabase`].
1009pub type DynamoDbStoreError = ValueSplittingError<DynamoDbStoreInternalError>;
1010
1011/// The config type for [`DynamoDbDatabase`]`
1012pub type DynamoDbStoreConfig = LruCachingConfig<DynamoDbStoreInternalConfig>;
1013
1014/// A shared DB client for DynamoDB with metrics
1015#[cfg(with_metrics)]
1016pub type DynamoDbDatabase = MeteredDatabase<
1017    LruCachingDatabase<
1018        MeteredDatabase<
1019            ValueSplittingDatabase<
1020                MeteredDatabase<JournalingKeyValueDatabase<DynamoDbDatabaseInternal>>,
1021            >,
1022        >,
1023    >,
1024>;
1025/// A shared DB client for DynamoDB
1026#[cfg(not(with_metrics))]
1027pub type DynamoDbDatabase = LruCachingDatabase<
1028    ValueSplittingDatabase<JournalingKeyValueDatabase<DynamoDbDatabaseInternal>>,
1029>;
1030
1031#[cfg(test)]
1032mod tests {
1033    use bcs::serialized_size;
1034
1035    use crate::common::get_uleb128_size;
1036
1037    #[test]
1038    fn test_serialization_len() {
1039        for n in [0, 10, 127, 128, 129, 16383, 16384, 20000] {
1040            let vec = vec![0u8; n];
1041            let est_size = get_uleb128_size(n) + n;
1042            let serial_size = serialized_size(&vec).unwrap();
1043            assert_eq!(est_size, serial_size);
1044        }
1045    }
1046}