1use std::{
7 collections::HashMap,
8 env,
9 sync::{
10 atomic::{AtomicBool, Ordering},
11 Arc,
12 },
13};
14
15use async_lock::{Semaphore, SemaphoreGuard};
16use aws_sdk_dynamodb::{
17 error::SdkError,
18 operation::{
19 create_table::CreateTableError,
20 delete_table::DeleteTableError,
21 get_item::GetItemError,
22 list_tables::ListTablesError,
23 query::{QueryError, QueryOutput},
24 transact_write_items::TransactWriteItemsError,
25 },
26 primitives::Blob,
27 types::{
28 AttributeDefinition, AttributeValue, Delete, KeySchemaElement, KeyType,
29 ProvisionedThroughput, Put, ScalarAttributeType, TransactWriteItem,
30 },
31 Client,
32};
33use aws_smithy_types::error::operation::BuildError;
34use futures::future::join_all;
35use linera_base::{ensure, util::future::FutureSyncExt as _};
36use serde::{Deserialize, Serialize};
37use thiserror::Error;
38
39#[cfg(with_metrics)]
40use crate::metering::MeteredDatabase;
41#[cfg(with_testing)]
42use crate::store::TestKeyValueDatabase;
43use crate::{
44 batch::SimpleUnorderedBatch,
45 common::get_uleb128_size,
46 journaling::{JournalConsistencyError, JournalingKeyValueDatabase},
47 lru_caching::{LruCachingConfig, LruCachingDatabase},
48 store::{
49 DirectWritableKeyValueStore, KeyValueDatabase, KeyValueStoreError, ReadableKeyValueStore,
50 WithError,
51 },
52 value_splitting::{ValueSplittingDatabase, ValueSplittingError},
53};
54
55const DYNAMODB_LOCAL_ENDPOINT: &str = "DYNAMODB_LOCAL_ENDPOINT";
57
58async fn get_base_config() -> Result<aws_sdk_dynamodb::Config, DynamoDbStoreInternalError> {
60 let base_config = aws_config::load_defaults(aws_config::BehaviorVersion::latest())
61 .boxed_sync()
62 .await;
63 Ok((&base_config).into())
64}
65
66fn get_endpoint_address() -> Option<String> {
67 env::var(DYNAMODB_LOCAL_ENDPOINT).ok()
68}
69
70async fn get_dynamodb_local_config() -> Result<aws_sdk_dynamodb::Config, DynamoDbStoreInternalError>
72{
73 let base_config = aws_config::load_defaults(aws_config::BehaviorVersion::latest())
74 .boxed_sync()
75 .await;
76 let endpoint_address = get_endpoint_address().unwrap();
77 let config = aws_sdk_dynamodb::config::Builder::from(&base_config)
78 .endpoint_url(endpoint_address)
79 .build();
80 Ok(config)
81}
82
83const PARTITION_KEY_ROOT_KEY: &[u8] = &[1];
88
89const PARTITION_ATTRIBUTE: &str = "item_partition";
91
92const EMPTY_ROOT_KEY: &[u8] = &[0];
94
95const DB_KEY: &[u8] = &[0];
97
98const KEY_ATTRIBUTE: &str = "item_key";
100
101const VALUE_ATTRIBUTE: &str = "item_value";
103
104const KEY_VALUE_ATTRIBUTE: &str = "item_key, item_value";
106
107const RAW_MAX_VALUE_SIZE: usize = 409600;
110
111const VISIBLE_MAX_VALUE_SIZE: usize = RAW_MAX_VALUE_SIZE
125 - MAX_KEY_SIZE
126 - get_uleb128_size(RAW_MAX_VALUE_SIZE)
127 - get_uleb128_size(MAX_KEY_SIZE)
128 - 1
129 - 1;
130
131const MAX_KEY_SIZE: usize = 1024;
134
135const MAX_TRANSACT_WRITE_ITEM_TOTAL_SIZE: usize = 4000000;
139
140#[cfg(with_testing)]
144const TEST_DYNAMO_DB_MAX_CONCURRENT_QUERIES: usize = 10;
145
146#[cfg(with_testing)]
148const TEST_DYNAMO_DB_MAX_STREAM_QUERIES: usize = 10;
149
150const MAX_TRANSACT_WRITE_ITEM_SIZE: usize = 100;
153
154fn build_key(start_key: &[u8], key: Vec<u8>) -> HashMap<String, AttributeValue> {
163 [
164 (
165 PARTITION_ATTRIBUTE.to_owned(),
166 AttributeValue::B(Blob::new(start_key.to_vec())),
167 ),
168 (KEY_ATTRIBUTE.to_owned(), AttributeValue::B(Blob::new(key))),
169 ]
170 .into()
171}
172
173fn build_key_value(
175 start_key: &[u8],
176 key: Vec<u8>,
177 value: Vec<u8>,
178) -> HashMap<String, AttributeValue> {
179 [
180 (
181 PARTITION_ATTRIBUTE.to_owned(),
182 AttributeValue::B(Blob::new(start_key.to_vec())),
183 ),
184 (KEY_ATTRIBUTE.to_owned(), AttributeValue::B(Blob::new(key))),
185 (
186 VALUE_ATTRIBUTE.to_owned(),
187 AttributeValue::B(Blob::new(value)),
188 ),
189 ]
190 .into()
191}
192
193fn check_key_size(key: &[u8]) -> Result<(), DynamoDbStoreInternalError> {
195 ensure!(!key.is_empty(), DynamoDbStoreInternalError::ZeroLengthKey);
196 ensure!(
197 key.len() <= MAX_KEY_SIZE,
198 DynamoDbStoreInternalError::KeyTooLong
199 );
200 Ok(())
201}
202
203fn extract_key(
205 prefix_len: usize,
206 attributes: &HashMap<String, AttributeValue>,
207) -> Result<&[u8], DynamoDbStoreInternalError> {
208 let key = attributes
209 .get(KEY_ATTRIBUTE)
210 .ok_or(DynamoDbStoreInternalError::MissingKey)?;
211 match key {
212 AttributeValue::B(blob) => Ok(&blob.as_ref()[prefix_len..]),
213 key => Err(DynamoDbStoreInternalError::wrong_key_type(key)),
214 }
215}
216
217fn extract_value(
219 attributes: &HashMap<String, AttributeValue>,
220) -> Result<&[u8], DynamoDbStoreInternalError> {
221 let value = attributes
224 .get(VALUE_ATTRIBUTE)
225 .ok_or(DynamoDbStoreInternalError::MissingValue)?;
226 match value {
227 AttributeValue::B(blob) => Ok(blob.as_ref()),
228 value => Err(DynamoDbStoreInternalError::wrong_value_type(value)),
229 }
230}
231
232fn extract_value_owned(
234 attributes: &mut HashMap<String, AttributeValue>,
235) -> Result<Vec<u8>, DynamoDbStoreInternalError> {
236 let value = attributes
237 .remove(VALUE_ATTRIBUTE)
238 .ok_or(DynamoDbStoreInternalError::MissingValue)?;
239 match value {
240 AttributeValue::B(blob) => Ok(blob.into_inner()),
241 value => Err(DynamoDbStoreInternalError::wrong_value_type(&value)),
242 }
243}
244
245fn extract_key_value(
247 prefix_len: usize,
248 attributes: &HashMap<String, AttributeValue>,
249) -> Result<(&[u8], &[u8]), DynamoDbStoreInternalError> {
250 let key = extract_key(prefix_len, attributes)?;
251 let value = extract_value(attributes)?;
252 Ok((key, value))
253}
254
255struct TransactionBuilder {
256 start_key: Vec<u8>,
257 transactions: Vec<TransactWriteItem>,
258}
259
260impl TransactionBuilder {
261 fn new(start_key: &[u8]) -> Self {
262 Self {
263 start_key: start_key.to_vec(),
264 transactions: Vec::new(),
265 }
266 }
267
268 fn insert_delete_request(
269 &mut self,
270 key: Vec<u8>,
271 store: &DynamoDbStoreInternal,
272 ) -> Result<(), DynamoDbStoreInternalError> {
273 let transaction = store.build_delete_transaction(&self.start_key, key)?;
274 self.transactions.push(transaction);
275 Ok(())
276 }
277
278 fn insert_put_request(
279 &mut self,
280 key: Vec<u8>,
281 value: Vec<u8>,
282 store: &DynamoDbStoreInternal,
283 ) -> Result<(), DynamoDbStoreInternalError> {
284 let transaction = store.build_put_transaction(&self.start_key, key, value)?;
285 self.transactions.push(transaction);
286 Ok(())
287 }
288}
289
290#[derive(Clone, Debug)]
292pub struct DynamoDbStoreInternal {
293 client: Client,
294 namespace: String,
295 semaphore: Option<Arc<Semaphore>>,
296 max_stream_queries: usize,
297 start_key: Vec<u8>,
298 root_key_written: Arc<AtomicBool>,
299}
300
301#[derive(Clone)]
303pub struct DynamoDbDatabaseInternal {
304 client: Client,
305 namespace: String,
306 semaphore: Option<Arc<Semaphore>>,
307 max_stream_queries: usize,
308}
309
310impl WithError for DynamoDbDatabaseInternal {
311 type Error = DynamoDbStoreInternalError;
312}
313
314#[derive(Debug, Clone, Serialize, Deserialize)]
316pub struct DynamoDbStoreInternalConfig {
317 pub use_dynamodb_local: bool,
319 pub max_concurrent_queries: Option<usize>,
321 pub max_stream_queries: usize,
323}
324
325impl DynamoDbStoreInternalConfig {
326 async fn client(&self) -> Result<Client, DynamoDbStoreInternalError> {
327 let config = if self.use_dynamodb_local {
328 get_dynamodb_local_config().await?
329 } else {
330 get_base_config().await?
331 };
332 Ok(Client::from_conf(config))
333 }
334}
335
336impl KeyValueDatabase for DynamoDbDatabaseInternal {
337 type Config = DynamoDbStoreInternalConfig;
338 type Store = DynamoDbStoreInternal;
339
340 fn get_name() -> String {
341 "dynamodb internal".to_string()
342 }
343
344 async fn connect(
345 config: &Self::Config,
346 namespace: &str,
347 ) -> Result<Self, DynamoDbStoreInternalError> {
348 Self::check_namespace(namespace)?;
349 let client = config.client().await?;
350 let semaphore = config
351 .max_concurrent_queries
352 .map(|n| Arc::new(Semaphore::new(n)));
353 let max_stream_queries = config.max_stream_queries;
354 let namespace = namespace.to_string();
355 let store = Self {
356 client,
357 namespace,
358 semaphore,
359 max_stream_queries,
360 };
361 Ok(store)
362 }
363
364 fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, DynamoDbStoreInternalError> {
365 let mut start_key = EMPTY_ROOT_KEY.to_vec();
366 start_key.extend(root_key);
367 Ok(self.open_internal(start_key))
368 }
369
370 fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, DynamoDbStoreInternalError> {
371 self.open_shared(root_key)
372 }
373
374 async fn list_all(config: &Self::Config) -> Result<Vec<String>, DynamoDbStoreInternalError> {
375 let client = config.client().await?;
376 let mut namespaces = Vec::new();
377 let mut start_table = None;
378 loop {
379 let response = client
380 .list_tables()
381 .set_exclusive_start_table_name(start_table)
382 .send()
383 .boxed_sync()
384 .await?;
385 if let Some(namespaces_blk) = response.table_names {
386 namespaces.extend(namespaces_blk);
387 }
388 if response.last_evaluated_table_name.is_none() {
389 break;
390 } else {
391 start_table = response.last_evaluated_table_name;
392 }
393 }
394 Ok(namespaces)
395 }
396
397 async fn list_root_keys(&self) -> Result<Vec<Vec<u8>>, DynamoDbStoreInternalError> {
398 let store = self.open_internal(PARTITION_KEY_ROOT_KEY.to_vec());
399 store.find_keys_by_prefix(EMPTY_ROOT_KEY).await
400 }
401
402 async fn delete_all(config: &Self::Config) -> Result<(), DynamoDbStoreInternalError> {
403 let client = config.client().await?;
404 let tables = Self::list_all(config).await?;
405 for table in tables {
406 client
407 .delete_table()
408 .table_name(&table)
409 .send()
410 .boxed_sync()
411 .await?;
412 }
413 Ok(())
414 }
415
416 async fn exists(
417 config: &Self::Config,
418 namespace: &str,
419 ) -> Result<bool, DynamoDbStoreInternalError> {
420 Self::check_namespace(namespace)?;
421 let client = config.client().await?;
422 let key_db = build_key(EMPTY_ROOT_KEY, DB_KEY.to_vec());
423 let response = client
424 .get_item()
425 .table_name(namespace)
426 .set_key(Some(key_db))
427 .send()
428 .boxed_sync()
429 .await;
430 let Err(error) = response else {
431 return Ok(true);
432 };
433 let test = match &error {
434 SdkError::ServiceError(error) => match error.err() {
435 GetItemError::ResourceNotFoundException(error) => {
436 error.message
437 == Some("Cannot do operations on a non-existent table".to_string())
438 }
439 _ => false,
440 },
441 _ => false,
442 };
443 if test {
444 Ok(false)
445 } else {
446 Err(error.into())
447 }
448 }
449
450 async fn create(
451 config: &Self::Config,
452 namespace: &str,
453 ) -> Result<(), DynamoDbStoreInternalError> {
454 Self::check_namespace(namespace)?;
455 let client = config.client().await?;
456 client
457 .create_table()
458 .table_name(namespace)
459 .attribute_definitions(
460 AttributeDefinition::builder()
461 .attribute_name(PARTITION_ATTRIBUTE)
462 .attribute_type(ScalarAttributeType::B)
463 .build()?,
464 )
465 .attribute_definitions(
466 AttributeDefinition::builder()
467 .attribute_name(KEY_ATTRIBUTE)
468 .attribute_type(ScalarAttributeType::B)
469 .build()?,
470 )
471 .key_schema(
472 KeySchemaElement::builder()
473 .attribute_name(PARTITION_ATTRIBUTE)
474 .key_type(KeyType::Hash)
475 .build()?,
476 )
477 .key_schema(
478 KeySchemaElement::builder()
479 .attribute_name(KEY_ATTRIBUTE)
480 .key_type(KeyType::Range)
481 .build()?,
482 )
483 .provisioned_throughput(
484 ProvisionedThroughput::builder()
485 .read_capacity_units(10)
486 .write_capacity_units(10)
487 .build()?,
488 )
489 .send()
490 .boxed_sync()
491 .await?;
492 Ok(())
493 }
494
495 async fn delete(
496 config: &Self::Config,
497 namespace: &str,
498 ) -> Result<(), DynamoDbStoreInternalError> {
499 Self::check_namespace(namespace)?;
500 let client = config.client().await?;
501 client
502 .delete_table()
503 .table_name(namespace)
504 .send()
505 .boxed_sync()
506 .await?;
507 Ok(())
508 }
509}
510
511impl DynamoDbDatabaseInternal {
512 fn check_namespace(namespace: &str) -> Result<(), InvalidNamespace> {
516 if namespace.len() < 3 {
517 return Err(InvalidNamespace::TooShort);
518 }
519 if namespace.len() > 255 {
520 return Err(InvalidNamespace::TooLong);
521 }
522 if !namespace.chars().all(|character| {
523 character.is_ascii_alphanumeric()
524 || character == '.'
525 || character == '-'
526 || character == '_'
527 }) {
528 return Err(InvalidNamespace::InvalidCharacter);
529 }
530 Ok(())
531 }
532
533 fn open_internal(&self, start_key: Vec<u8>) -> DynamoDbStoreInternal {
534 let client = self.client.clone();
535 let namespace = self.namespace.clone();
536 let semaphore = self.semaphore.clone();
537 let max_stream_queries = self.max_stream_queries;
538 DynamoDbStoreInternal {
539 client,
540 namespace,
541 semaphore,
542 max_stream_queries,
543 start_key,
544 root_key_written: Arc::new(AtomicBool::new(false)),
545 }
546 }
547}
548
549impl DynamoDbStoreInternal {
550 fn build_delete_transaction(
551 &self,
552 start_key: &[u8],
553 key: Vec<u8>,
554 ) -> Result<TransactWriteItem, DynamoDbStoreInternalError> {
555 check_key_size(&key)?;
556 let request = Delete::builder()
557 .table_name(&self.namespace)
558 .set_key(Some(build_key(start_key, key)))
559 .build()?;
560 Ok(TransactWriteItem::builder().delete(request).build())
561 }
562
563 fn build_put_transaction(
564 &self,
565 start_key: &[u8],
566 key: Vec<u8>,
567 value: Vec<u8>,
568 ) -> Result<TransactWriteItem, DynamoDbStoreInternalError> {
569 check_key_size(&key)?;
570 ensure!(
571 value.len() <= RAW_MAX_VALUE_SIZE,
572 DynamoDbStoreInternalError::ValueLengthTooLarge
573 );
574 let request = Put::builder()
575 .table_name(&self.namespace)
576 .set_item(Some(build_key_value(start_key, key, value)))
577 .build()?;
578 Ok(TransactWriteItem::builder().put(request).build())
579 }
580
581 async fn acquire(&self) -> Option<SemaphoreGuard<'_>> {
583 match &self.semaphore {
584 None => None,
585 Some(count) => Some(count.acquire().await),
586 }
587 }
588
589 async fn get_query_output(
590 &self,
591 attribute_str: &str,
592 start_key: &[u8],
593 key_prefix: &[u8],
594 start_key_map: Option<HashMap<String, AttributeValue>>,
595 ) -> Result<QueryOutput, DynamoDbStoreInternalError> {
596 let _guard = self.acquire().await;
597 let start_key = start_key.to_vec();
598 let response = self
599 .client
600 .query()
601 .table_name(&self.namespace)
602 .projection_expression(attribute_str)
603 .key_condition_expression(format!(
604 "{PARTITION_ATTRIBUTE} = :partition and begins_with({KEY_ATTRIBUTE}, :prefix)"
605 ))
606 .expression_attribute_values(":partition", AttributeValue::B(Blob::new(start_key)))
607 .expression_attribute_values(":prefix", AttributeValue::B(Blob::new(key_prefix)))
608 .set_exclusive_start_key(start_key_map)
609 .send()
610 .boxed_sync()
611 .await?;
612 Ok(response)
613 }
614
615 async fn read_value_bytes_general(
616 &self,
617 key_db: HashMap<String, AttributeValue>,
618 ) -> Result<Option<Vec<u8>>, DynamoDbStoreInternalError> {
619 let _guard = self.acquire().await;
620 let response = self
621 .client
622 .get_item()
623 .table_name(&self.namespace)
624 .set_key(Some(key_db))
625 .send()
626 .boxed_sync()
627 .await?;
628
629 match response.item {
630 Some(mut item) => {
631 let value = extract_value_owned(&mut item)?;
632 Ok(Some(value))
633 }
634 None => Ok(None),
635 }
636 }
637
638 async fn contains_key_general(
639 &self,
640 key_db: HashMap<String, AttributeValue>,
641 ) -> Result<bool, DynamoDbStoreInternalError> {
642 let _guard = self.acquire().await;
643 let response = self
644 .client
645 .get_item()
646 .table_name(&self.namespace)
647 .set_key(Some(key_db))
648 .projection_expression(PARTITION_ATTRIBUTE)
649 .send()
650 .boxed_sync()
651 .await?;
652
653 Ok(response.item.is_some())
654 }
655
656 async fn get_list_responses(
657 &self,
658 attribute: &str,
659 start_key: &[u8],
660 key_prefix: &[u8],
661 ) -> Result<QueryResponses, DynamoDbStoreInternalError> {
662 check_key_size(key_prefix)?;
663 let mut responses = Vec::new();
664 let mut start_key_map = None;
665 loop {
666 let response = self
667 .get_query_output(attribute, start_key, key_prefix, start_key_map)
668 .await?;
669 let last_evaluated = response.last_evaluated_key.clone();
670 responses.push(response);
671 match last_evaluated {
672 None => {
673 break;
674 }
675 Some(value) => {
676 start_key_map = Some(value);
677 }
678 }
679 }
680 Ok(QueryResponses {
681 prefix_len: key_prefix.len(),
682 responses,
683 })
684 }
685}
686
687struct QueryResponses {
688 prefix_len: usize,
689 responses: Vec<QueryOutput>,
690}
691
692impl QueryResponses {
693 fn keys(&self) -> impl Iterator<Item = Result<&[u8], DynamoDbStoreInternalError>> {
694 self.responses
695 .iter()
696 .flat_map(|response| response.items.iter().flatten())
697 .map(|item| extract_key(self.prefix_len, item))
698 }
699
700 fn key_values(
701 &self,
702 ) -> impl Iterator<Item = Result<(&[u8], &[u8]), DynamoDbStoreInternalError>> {
703 self.responses
704 .iter()
705 .flat_map(|response| response.items.iter().flatten())
706 .map(|item| extract_key_value(self.prefix_len, item))
707 }
708}
709
710impl WithError for DynamoDbStoreInternal {
711 type Error = DynamoDbStoreInternalError;
712}
713
714impl ReadableKeyValueStore for DynamoDbStoreInternal {
715 const MAX_KEY_SIZE: usize = MAX_KEY_SIZE;
716
717 fn max_stream_queries(&self) -> usize {
718 self.max_stream_queries
719 }
720
721 fn root_key(&self) -> Result<Vec<u8>, DynamoDbStoreInternalError> {
722 assert!(self.start_key.starts_with(EMPTY_ROOT_KEY));
723 Ok(self.start_key[EMPTY_ROOT_KEY.len()..].to_vec())
724 }
725
726 async fn read_value_bytes(
727 &self,
728 key: &[u8],
729 ) -> Result<Option<Vec<u8>>, DynamoDbStoreInternalError> {
730 check_key_size(key)?;
731 let key_db = build_key(&self.start_key, key.to_vec());
732 self.read_value_bytes_general(key_db).await
733 }
734
735 async fn contains_key(&self, key: &[u8]) -> Result<bool, DynamoDbStoreInternalError> {
736 check_key_size(key)?;
737 let key_db = build_key(&self.start_key, key.to_vec());
738 self.contains_key_general(key_db).await
739 }
740
741 async fn contains_keys(
742 &self,
743 keys: &[Vec<u8>],
744 ) -> Result<Vec<bool>, DynamoDbStoreInternalError> {
745 let mut handles = Vec::new();
746 for key in keys {
747 check_key_size(key)?;
748 let key_db = build_key(&self.start_key, key.clone());
749 let handle = self.contains_key_general(key_db);
750 handles.push(handle);
751 }
752 join_all(handles)
753 .await
754 .into_iter()
755 .collect::<Result<_, _>>()
756 }
757
758 async fn read_multi_values_bytes(
759 &self,
760 keys: &[Vec<u8>],
761 ) -> Result<Vec<Option<Vec<u8>>>, DynamoDbStoreInternalError> {
762 let mut handles = Vec::new();
763 for key in keys {
764 check_key_size(key)?;
765 let key_db = build_key(&self.start_key, key.to_vec());
766 let handle = self.read_value_bytes_general(key_db);
767 handles.push(handle);
768 }
769 join_all(handles)
770 .await
771 .into_iter()
772 .collect::<Result<_, _>>()
773 }
774
775 async fn find_keys_by_prefix(
776 &self,
777 key_prefix: &[u8],
778 ) -> Result<Vec<Vec<u8>>, DynamoDbStoreInternalError> {
779 let result_queries = self
780 .get_list_responses(KEY_ATTRIBUTE, &self.start_key, key_prefix)
781 .await?;
782 result_queries
783 .keys()
784 .map(|key| key.map(|k| k.to_vec()))
785 .collect()
786 }
787
788 async fn find_key_values_by_prefix(
789 &self,
790 key_prefix: &[u8],
791 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, DynamoDbStoreInternalError> {
792 let result_queries = self
793 .get_list_responses(KEY_VALUE_ATTRIBUTE, &self.start_key, key_prefix)
794 .await?;
795 result_queries
796 .key_values()
797 .map(|entry| entry.map(|(key, value)| (key.to_vec(), value.to_vec())))
798 .collect()
799 }
800}
801
802impl DirectWritableKeyValueStore for DynamoDbStoreInternal {
803 const MAX_BATCH_SIZE: usize = MAX_TRANSACT_WRITE_ITEM_SIZE;
804 const MAX_BATCH_TOTAL_SIZE: usize = MAX_TRANSACT_WRITE_ITEM_TOTAL_SIZE;
805 const MAX_VALUE_SIZE: usize = VISIBLE_MAX_VALUE_SIZE;
806
807 type Batch = SimpleUnorderedBatch;
809
810 async fn write_batch(&self, batch: Self::Batch) -> Result<(), DynamoDbStoreInternalError> {
811 if !self.root_key_written.fetch_or(true, Ordering::SeqCst) {
812 let mut builder = TransactionBuilder::new(PARTITION_KEY_ROOT_KEY);
813 builder.insert_put_request(self.start_key.clone(), vec![], self)?;
814 self.client
815 .transact_write_items()
816 .set_transact_items(Some(builder.transactions))
817 .send()
818 .boxed_sync()
819 .await?;
820 }
821 let mut builder = TransactionBuilder::new(&self.start_key);
822 for key in batch.deletions {
823 builder.insert_delete_request(key, self)?;
824 }
825 for (key, value) in batch.insertions {
826 builder.insert_put_request(key, value, self)?;
827 }
828 if !builder.transactions.is_empty() {
829 let _guard = self.acquire().await;
830 self.client
831 .transact_write_items()
832 .set_transact_items(Some(builder.transactions))
833 .send()
834 .boxed_sync()
835 .await?;
836 }
837 Ok(())
838 }
839}
840
841#[derive(Debug, Error)]
843pub enum InvalidNamespace {
844 #[error("Namespace must have at least 3 characters")]
846 TooShort,
847
848 #[error("Namespace must be at most 63 characters")]
850 TooLong,
851
852 #[error("Namespace must only contain lowercase letters, numbers, periods and hyphens")]
854 InvalidCharacter,
855}
856
857#[derive(Debug, Error)]
859pub enum DynamoDbStoreInternalError {
860 #[error(transparent)]
862 Get(#[from] Box<SdkError<GetItemError>>),
863
864 #[error(transparent)]
866 TransactWriteItem(#[from] Box<SdkError<TransactWriteItemsError>>),
867
868 #[error(transparent)]
870 Query(#[from] Box<SdkError<QueryError>>),
871
872 #[error(transparent)]
874 DeleteTable(#[from] Box<SdkError<DeleteTableError>>),
875
876 #[error(transparent)]
878 ListTables(#[from] Box<SdkError<ListTablesError>>),
879
880 #[error("The transact must have length at most MAX_TRANSACT_WRITE_ITEM_SIZE")]
882 TransactUpperLimitSize,
883
884 #[error("The key must be of strictly positive length")]
886 ZeroLengthKey,
887
888 #[error("The key must have at most 1024 bytes")]
890 KeyTooLong,
891
892 #[error("The key prefix must have at most 1024 bytes")]
894 KeyPrefixTooLong,
895
896 #[error("The key_prefix must be of strictly positive length")]
898 ZeroLengthKeyPrefix,
899
900 #[error(transparent)]
902 JournalConsistencyError(#[from] JournalConsistencyError),
903
904 #[error("The DynamoDB value should be less than 400 KB")]
906 ValueLengthTooLarge,
907
908 #[error("The stored key attribute is missing")]
910 MissingKey,
911
912 #[error("Key was stored as {0}, but it was expected to be stored as a binary blob")]
914 WrongKeyType(String),
915
916 #[error("The stored value attribute is missing")]
918 MissingValue,
919
920 #[error("Value was stored as {0}, but it was expected to be stored as a binary blob")]
922 WrongValueType(String),
923
924 #[error(transparent)]
926 BcsError(#[from] bcs::Error),
927
928 #[error(transparent)]
930 InvalidNamespace(#[from] InvalidNamespace),
931
932 #[error(transparent)]
934 CreateTable(#[from] Box<SdkError<CreateTableError>>),
935
936 #[error(transparent)]
938 Build(#[from] Box<BuildError>),
939}
940
941impl<InnerError> From<SdkError<InnerError>> for DynamoDbStoreInternalError
942where
943 DynamoDbStoreInternalError: From<Box<SdkError<InnerError>>>,
944{
945 fn from(error: SdkError<InnerError>) -> Self {
946 Box::new(error).into()
947 }
948}
949
950impl From<BuildError> for DynamoDbStoreInternalError {
951 fn from(error: BuildError) -> Self {
952 Box::new(error).into()
953 }
954}
955
956impl DynamoDbStoreInternalError {
957 pub fn wrong_key_type(value: &AttributeValue) -> Self {
963 DynamoDbStoreInternalError::WrongKeyType(Self::type_description_of(value))
964 }
965
966 pub fn wrong_value_type(value: &AttributeValue) -> Self {
972 DynamoDbStoreInternalError::WrongValueType(Self::type_description_of(value))
973 }
974
975 fn type_description_of(value: &AttributeValue) -> String {
976 match value {
977 AttributeValue::B(_) => unreachable!("creating an error type for the correct type"),
978 AttributeValue::Bool(_) => "a boolean",
979 AttributeValue::Bs(_) => "a list of binary blobs",
980 AttributeValue::L(_) => "a list",
981 AttributeValue::M(_) => "a map",
982 AttributeValue::N(_) => "a number",
983 AttributeValue::Ns(_) => "a list of numbers",
984 AttributeValue::Null(_) => "a null value",
985 AttributeValue::S(_) => "a string",
986 AttributeValue::Ss(_) => "a list of strings",
987 _ => "an unknown type",
988 }
989 .to_owned()
990 }
991}
992
993impl KeyValueStoreError for DynamoDbStoreInternalError {
994 const BACKEND: &'static str = "dynamo_db";
995}
996
997#[cfg(with_testing)]
998impl TestKeyValueDatabase for JournalingKeyValueDatabase<DynamoDbDatabaseInternal> {
999 async fn new_test_config() -> Result<DynamoDbStoreInternalConfig, DynamoDbStoreInternalError> {
1000 Ok(DynamoDbStoreInternalConfig {
1001 use_dynamodb_local: true,
1002 max_concurrent_queries: Some(TEST_DYNAMO_DB_MAX_CONCURRENT_QUERIES),
1003 max_stream_queries: TEST_DYNAMO_DB_MAX_STREAM_QUERIES,
1004 })
1005 }
1006}
1007
1008pub type DynamoDbStoreError = ValueSplittingError<DynamoDbStoreInternalError>;
1010
1011pub type DynamoDbStoreConfig = LruCachingConfig<DynamoDbStoreInternalConfig>;
1013
1014#[cfg(with_metrics)]
1016pub type DynamoDbDatabase = MeteredDatabase<
1017 LruCachingDatabase<
1018 MeteredDatabase<
1019 ValueSplittingDatabase<
1020 MeteredDatabase<JournalingKeyValueDatabase<DynamoDbDatabaseInternal>>,
1021 >,
1022 >,
1023 >,
1024>;
1025#[cfg(not(with_metrics))]
1027pub type DynamoDbDatabase = LruCachingDatabase<
1028 ValueSplittingDatabase<JournalingKeyValueDatabase<DynamoDbDatabaseInternal>>,
1029>;
1030
1031#[cfg(test)]
1032mod tests {
1033 use bcs::serialized_size;
1034
1035 use crate::common::get_uleb128_size;
1036
1037 #[test]
1038 fn test_serialization_len() {
1039 for n in [0, 10, 127, 128, 129, 16383, 16384, 20000] {
1040 let vec = vec![0u8; n];
1041 let est_size = get_uleb128_size(n) + n;
1042 let serial_size = serialized_size(&vec).unwrap();
1043 assert_eq!(est_size, serial_size);
1044 }
1045 }
1046}