graphddb_runtime 0.7.5

Rust runtime for GraphDDB — interprets the language-neutral IR (manifest.json + operations.json) and executes the validated access patterns against DynamoDB.
Documentation
//! The DynamoDB client abstraction the runtime executes against.
//!
//! A narrow async trait over exactly the operations the runtime issues, so the
//! runtime is testable without a live client (a fake impl records requests /
//! replays responses) and stays decoupled from the concrete SDK — the Rust
//! counterpart of the Python runtime taking any boto3-shaped client. The
//! [`AwsDynamoClient`] adapter wraps a real `aws_sdk_dynamodb::Client`.

use std::collections::HashMap;

use async_trait::async_trait;
use aws_sdk_dynamodb::types::{AttributeValue, KeysAndAttributes, TransactWriteItem, WriteRequest};

use crate::errors::{GraphDDBError, Result};

/// A raw DynamoDB item / key (an `AttributeValue` map).
pub type Item = HashMap<String, AttributeValue>;

/// A `GetItem` request (a subset of the boto3 request).
#[derive(Debug, Clone, Default)]
pub struct GetItemInput {
    /// Physical table name.
    pub table_name: String,
    /// The primary key.
    pub key: Item,
    /// Strongly-consistent read flag.
    pub consistent_read: bool,
    /// Optional `ProjectionExpression`.
    pub projection_expression: Option<String>,
    /// Optional `ExpressionAttributeNames`.
    pub expression_attribute_names: Option<HashMap<String, String>>,
}

/// A `GetItem` response.
#[derive(Debug, Clone, Default)]
pub struct GetItemOutput {
    /// The item, or `None` when absent.
    pub item: Option<Item>,
}

/// A `Query` request (a subset of the boto3 request).
#[derive(Debug, Clone, Default)]
pub struct QueryInput {
    /// Physical table name.
    pub table_name: String,
    /// `KeyConditionExpression`.
    pub key_condition_expression: String,
    /// `ExpressionAttributeNames`.
    pub expression_attribute_names: HashMap<String, String>,
    /// `ExpressionAttributeValues`.
    pub expression_attribute_values: HashMap<String, AttributeValue>,
    /// Optional `IndexName`.
    pub index_name: Option<String>,
    /// Optional `FilterExpression`.
    pub filter_expression: Option<String>,
    /// Optional `Limit`.
    pub limit: Option<i32>,
    /// Optional `ExclusiveStartKey`.
    pub exclusive_start_key: Option<Item>,
    /// Strongly-consistent read flag.
    pub consistent_read: bool,
}

/// A `Query` response.
#[derive(Debug, Clone, Default)]
pub struct QueryOutput {
    /// The matched items.
    pub items: Vec<Item>,
    /// The `LastEvaluatedKey`, or `None` when exhausted.
    pub last_evaluated_key: Option<Item>,
}

/// A `PutItem` / `UpdateItem` / `DeleteItem` response (only `Attributes` matters,
/// captured for the `ALL_OLD` image on the middleware write path).
#[derive(Debug, Clone, Default)]
pub struct WriteOutput {
    /// The `ReturnValues: ALL_OLD` image, if requested and present.
    pub attributes: Option<Item>,
}

/// The narrow async client the runtime drives.
#[async_trait]
pub trait DynamoClient: Send + Sync {
    /// `GetItem`.
    async fn get_item(&self, input: GetItemInput) -> Result<GetItemOutput>;
    /// `Query`.
    async fn query(&self, input: QueryInput) -> Result<QueryOutput>;
    /// `PutItem` (raw request body already composed).
    async fn put_item(
        &self,
        table_name: &str,
        item: Item,
        condition_expression: Option<String>,
        names: Option<HashMap<String, String>>,
        values: Option<HashMap<String, AttributeValue>>,
        return_all_old: bool,
    ) -> Result<WriteOutput>;
    /// `UpdateItem`.
    #[allow(clippy::too_many_arguments)]
    async fn update_item(
        &self,
        table_name: &str,
        key: Item,
        update_expression: Option<String>,
        condition_expression: Option<String>,
        names: Option<HashMap<String, String>>,
        values: Option<HashMap<String, AttributeValue>>,
        return_all_old: bool,
    ) -> Result<WriteOutput>;
    /// `DeleteItem`.
    async fn delete_item(
        &self,
        table_name: &str,
        key: Item,
        condition_expression: Option<String>,
        names: Option<HashMap<String, String>>,
        values: Option<HashMap<String, AttributeValue>>,
        return_all_old: bool,
    ) -> Result<WriteOutput>;
    /// `BatchGetItem` for a single table's chunk. Returns `(responses, unprocessed_keys)`.
    async fn batch_get_item(
        &self,
        table_name: &str,
        keys: Vec<Item>,
        projection_expression: Option<String>,
        names: Option<HashMap<String, String>>,
    ) -> Result<(Vec<Item>, Vec<Item>)>;
    /// `BatchWriteItem` for a single table's chunk. Returns the unprocessed requests.
    async fn batch_write_item(
        &self,
        table_name: &str,
        requests: Vec<WriteRequest>,
    ) -> Result<Vec<WriteRequest>>;
    /// `TransactWriteItems`.
    async fn transact_write_items(&self, items: Vec<TransactWriteItem>) -> Result<()>;
}

/// Adapter wrapping a real `aws_sdk_dynamodb::Client`.
pub struct AwsDynamoClient {
    client: aws_sdk_dynamodb::Client,
}

impl AwsDynamoClient {
    /// Wrap a configured SDK client.
    pub fn new(client: aws_sdk_dynamodb::Client) -> Self {
        Self { client }
    }
}

fn op_err(op: &str, e: impl std::fmt::Display) -> GraphDDBError {
    GraphDDBError::operation_execution(format!("DynamoDB {op} failed: {e}"))
}

#[async_trait]
impl DynamoClient for AwsDynamoClient {
    async fn get_item(&self, input: GetItemInput) -> Result<GetItemOutput> {
        let mut req = self
            .client
            .get_item()
            .table_name(&input.table_name)
            .set_key(Some(input.key))
            .consistent_read(input.consistent_read);
        if let Some(pe) = input.projection_expression {
            req = req.projection_expression(pe);
        }
        if let Some(names) = input.expression_attribute_names {
            req = req.set_expression_attribute_names(Some(names));
        }
        let resp = req.send().await.map_err(|e| op_err("GetItem", e))?;
        Ok(GetItemOutput { item: resp.item })
    }

    async fn query(&self, input: QueryInput) -> Result<QueryOutput> {
        let mut req = self
            .client
            .query()
            .table_name(&input.table_name)
            .key_condition_expression(&input.key_condition_expression)
            .set_expression_attribute_names(Some(input.expression_attribute_names))
            .set_expression_attribute_values(Some(input.expression_attribute_values))
            .consistent_read(input.consistent_read);
        if let Some(idx) = input.index_name {
            req = req.index_name(idx);
        }
        if let Some(fe) = input.filter_expression {
            req = req.filter_expression(fe);
        }
        if let Some(lim) = input.limit {
            req = req.limit(lim);
        }
        if let Some(esk) = input.exclusive_start_key {
            req = req.set_exclusive_start_key(Some(esk));
        }
        let resp = req.send().await.map_err(|e| op_err("Query", e))?;
        Ok(QueryOutput {
            items: resp.items.unwrap_or_default(),
            last_evaluated_key: resp.last_evaluated_key,
        })
    }

    async fn put_item(
        &self,
        table_name: &str,
        item: Item,
        condition_expression: Option<String>,
        names: Option<HashMap<String, String>>,
        values: Option<HashMap<String, AttributeValue>>,
        return_all_old: bool,
    ) -> Result<WriteOutput> {
        let mut req = self
            .client
            .put_item()
            .table_name(table_name)
            .set_item(Some(item));
        if let Some(ce) = condition_expression {
            req = req.condition_expression(ce);
        }
        if let Some(n) = names {
            req = req.set_expression_attribute_names(Some(n));
        }
        if let Some(v) = values {
            req = req.set_expression_attribute_values(Some(v));
        }
        if return_all_old {
            req = req.return_values(aws_sdk_dynamodb::types::ReturnValue::AllOld);
        }
        let resp = req.send().await.map_err(|e| op_err("PutItem", e))?;
        Ok(WriteOutput {
            attributes: resp.attributes,
        })
    }

    async fn update_item(
        &self,
        table_name: &str,
        key: Item,
        update_expression: Option<String>,
        condition_expression: Option<String>,
        names: Option<HashMap<String, String>>,
        values: Option<HashMap<String, AttributeValue>>,
        return_all_old: bool,
    ) -> Result<WriteOutput> {
        let mut req = self
            .client
            .update_item()
            .table_name(table_name)
            .set_key(Some(key));
        if let Some(ue) = update_expression {
            req = req.update_expression(ue);
        }
        if let Some(ce) = condition_expression {
            req = req.condition_expression(ce);
        }
        if let Some(n) = names {
            req = req.set_expression_attribute_names(Some(n));
        }
        if let Some(v) = values {
            req = req.set_expression_attribute_values(Some(v));
        }
        if return_all_old {
            req = req.return_values(aws_sdk_dynamodb::types::ReturnValue::AllOld);
        }
        let resp = req.send().await.map_err(|e| op_err("UpdateItem", e))?;
        Ok(WriteOutput {
            attributes: resp.attributes,
        })
    }

    async fn delete_item(
        &self,
        table_name: &str,
        key: Item,
        condition_expression: Option<String>,
        names: Option<HashMap<String, String>>,
        values: Option<HashMap<String, AttributeValue>>,
        return_all_old: bool,
    ) -> Result<WriteOutput> {
        let mut req = self
            .client
            .delete_item()
            .table_name(table_name)
            .set_key(Some(key));
        if let Some(ce) = condition_expression {
            req = req.condition_expression(ce);
        }
        if let Some(n) = names {
            req = req.set_expression_attribute_names(Some(n));
        }
        if let Some(v) = values {
            req = req.set_expression_attribute_values(Some(v));
        }
        if return_all_old {
            req = req.return_values(aws_sdk_dynamodb::types::ReturnValue::AllOld);
        }
        let resp = req.send().await.map_err(|e| op_err("DeleteItem", e))?;
        Ok(WriteOutput {
            attributes: resp.attributes,
        })
    }

    async fn batch_get_item(
        &self,
        table_name: &str,
        keys: Vec<Item>,
        projection_expression: Option<String>,
        names: Option<HashMap<String, String>>,
    ) -> Result<(Vec<Item>, Vec<Item>)> {
        let mut kaa = KeysAndAttributes::builder().set_keys(Some(keys));
        if let Some(pe) = projection_expression {
            kaa = kaa.projection_expression(pe);
        }
        if let Some(n) = names {
            kaa = kaa.set_expression_attribute_names(Some(n));
        }
        let kaa = kaa.build().map_err(|e| op_err("BatchGetItem", e))?;
        let resp = self
            .client
            .batch_get_item()
            .request_items(table_name, kaa)
            .send()
            .await
            .map_err(|e| op_err("BatchGetItem", e))?;
        let responses = resp
            .responses
            .and_then(|mut m| m.remove(table_name))
            .unwrap_or_default();
        let unprocessed = resp
            .unprocessed_keys
            .and_then(|mut m| m.remove(table_name))
            .map(|kaa| kaa.keys)
            .unwrap_or_default();
        Ok((responses, unprocessed))
    }

    async fn batch_write_item(
        &self,
        table_name: &str,
        requests: Vec<WriteRequest>,
    ) -> Result<Vec<WriteRequest>> {
        let resp = self
            .client
            .batch_write_item()
            .request_items(table_name, requests)
            .send()
            .await
            .map_err(|e| op_err("BatchWriteItem", e))?;
        Ok(resp
            .unprocessed_items
            .and_then(|mut m| m.remove(table_name))
            .unwrap_or_default())
    }

    async fn transact_write_items(&self, items: Vec<TransactWriteItem>) -> Result<()> {
        self.client
            .transact_write_items()
            .set_transact_items(Some(items))
            .send()
            .await
            .map_err(|e| op_err("TransactWriteItems", e))?;
        Ok(())
    }
}