Skip to main content

dynamo_es/
helpers.rs

1use aws_sdk_dynamodb::client::Client;
2use aws_sdk_dynamodb::operation::query::QueryOutput;
3use aws_sdk_dynamodb::types::{AttributeValue, TransactWriteItem};
4use serde_json::Value;
5use std::collections::HashMap;
6
7use crate::error::DynamoAggregateError;
8
9pub(crate) async fn load_dynamo_view(
10    client: &Client,
11    table_name: &str,
12    view_id: &str,
13) -> Result<QueryOutput, DynamoAggregateError> {
14    Ok(client
15        .query()
16        .table_name(table_name)
17        .key_condition_expression("#view_type_id = :view_type_id")
18        .expression_attribute_names("#view_type_id", "ViewId")
19        .expression_attribute_values(":view_type_id", AttributeValue::S(String::from(view_id)))
20        .send()
21        .await?)
22}
23
24pub(crate) async fn commit_transactions(
25    client: &Client,
26    transactions: Vec<TransactWriteItem>,
27) -> Result<(), DynamoAggregateError> {
28    let transaction_len = transactions.len();
29    if transaction_len > 25 {
30        return Err(DynamoAggregateError::TransactionListTooLong(
31            transaction_len,
32        ));
33    }
34    client
35        .transact_write_items()
36        .set_transact_items(Some(transactions))
37        .send()
38        .await?;
39    Ok(())
40}
41
42pub(crate) fn att_as_value(
43    values: &HashMap<String, AttributeValue>,
44    attribute_name: &str,
45) -> Result<Value, DynamoAggregateError> {
46    let attribute = require_attribute(values, attribute_name)?;
47    match attribute.as_b() {
48        Ok(payload_blob) => Ok(serde_json::from_slice(payload_blob.as_ref())?),
49        Err(_) => Err(DynamoAggregateError::MissingAttribute(
50            attribute_name.to_string(),
51        )),
52    }
53}
54
55pub(crate) fn att_as_number(
56    values: &HashMap<String, AttributeValue>,
57    attribute_name: &str,
58) -> Result<usize, DynamoAggregateError> {
59    let attribute = require_attribute(values, attribute_name)?;
60    match attribute.as_n() {
61        Ok(attribute_as_n) => attribute_as_n
62            .parse::<usize>()
63            .map_err(|_| DynamoAggregateError::MissingAttribute(attribute_name.to_string())),
64        Err(_) => Err(DynamoAggregateError::MissingAttribute(
65            attribute_name.to_string(),
66        )),
67    }
68}
69
70pub(crate) fn att_as_string(
71    values: &HashMap<String, AttributeValue>,
72    attribute_name: &str,
73) -> Result<String, DynamoAggregateError> {
74    let attribute = require_attribute(values, attribute_name)?;
75    match attribute.as_s() {
76        Ok(attribute_as_s) => Ok(attribute_as_s.to_string()),
77        Err(_) => Err(DynamoAggregateError::MissingAttribute(
78            attribute_name.to_string(),
79        )),
80    }
81}
82
83pub(crate) fn require_attribute<'a>(
84    values: &'a HashMap<String, AttributeValue>,
85    attribute_name: &str,
86) -> Result<&'a AttributeValue, DynamoAggregateError> {
87    values
88        .get(attribute_name)
89        .ok_or(DynamoAggregateError::MissingAttribute(
90            attribute_name.to_string(),
91        ))
92}