dynamo_es/
helpers.rs

1use aws_sdk_dynamodb::client::Client;
2use aws_sdk_dynamodb::operation::query::QueryOutput;
3use aws_sdk_dynamodb::types::{AttributeValue, TransactWriteItem};
4use serde_json::Value;
5use std::collections::HashMap;
6
7use crate::error::DynamoAggregateError;
8
9pub(crate) async fn load_dynamo_view(
10    client: &Client,
11    table_name: &str,
12    view_id: &str,
13) -> Result<QueryOutput, DynamoAggregateError> {
14    Ok(client
15        .query()
16        .table_name(table_name)
17        .key_condition_expression("#view_type_id = :view_type_id")
18        .expression_attribute_names("#view_type_id", "ViewId")
19        .expression_attribute_values(":view_type_id", AttributeValue::S(String::from(view_id)))
20        .send()
21        .await?)
22}
23
24pub(crate) async fn commit_transactions(
25    client: &Client,
26    transactions: Vec<TransactWriteItem>,
27) -> Result<(), DynamoAggregateError> {
28    let transaction_len = transactions.len();
29    if transaction_len > 25 {
30        return Err(DynamoAggregateError::TransactionListTooLong(
31            transaction_len,
32        ));
33    }
34    client
35        .transact_write_items()
36        .set_transact_items(Some(transactions))
37        .send()
38        .await?;
39    Ok(())
40}
41
42pub(crate) fn att_as_value(
43    values: &HashMap<String, AttributeValue>,
44    attribute_name: &str,
45) -> Result<Value, DynamoAggregateError> {
46    let attribute = require_attribute(values, attribute_name)?;
47    match attribute.as_b() {
48        Ok(payload_blob) => Ok(serde_json::from_slice(payload_blob.as_ref())?),
49        Err(_) => Err(DynamoAggregateError::MissingAttribute(
50            attribute_name.to_string(),
51        )),
52    }
53}
54
55pub(crate) fn att_as_number(
56    values: &HashMap<String, AttributeValue>,
57    attribute_name: &str,
58) -> Result<usize, DynamoAggregateError> {
59    let attribute = require_attribute(values, attribute_name)?;
60    match attribute.as_n() {
61        Ok(attribute_as_n) => match attribute_as_n.parse::<usize>() {
62            Ok(attribute_number) => Ok(attribute_number),
63            Err(_) => Err(DynamoAggregateError::MissingAttribute(
64                attribute_name.to_string(),
65            )),
66        },
67        Err(_) => Err(DynamoAggregateError::MissingAttribute(
68            attribute_name.to_string(),
69        )),
70    }
71}
72
73pub(crate) fn att_as_string(
74    values: &HashMap<String, AttributeValue>,
75    attribute_name: &str,
76) -> Result<String, DynamoAggregateError> {
77    let attribute = require_attribute(values, attribute_name)?;
78    match attribute.as_s() {
79        Ok(attribute_as_s) => Ok(attribute_as_s.to_string()),
80        Err(_) => Err(DynamoAggregateError::MissingAttribute(
81            attribute_name.to_string(),
82        )),
83    }
84}
85
86pub(crate) fn require_attribute<'a>(
87    values: &'a HashMap<String, AttributeValue>,
88    attribute_name: &str,
89) -> Result<&'a AttributeValue, DynamoAggregateError> {
90    match values.get(attribute_name) {
91        Some(attribute) => Ok(attribute),
92        None => Err(DynamoAggregateError::MissingAttribute(
93            attribute_name.to_string(),
94        )),
95    }
96}