Skip to main content

atomr_persistence_aws/
snapshot.rs

1//! DynamoDB `SnapshotStore` implementation (single-table design).
2
3use std::collections::HashMap;
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use atomr_persistence::{JournalError, SnapshotMetadata, SnapshotStore};
8use aws_sdk_dynamodb::primitives::Blob;
9use aws_sdk_dynamodb::types::AttributeValue;
10use aws_sdk_dynamodb::Client;
11
12use crate::config::DynamoConfig;
13use crate::keys::{parse_sequence, snapshot_sk, SNAPSHOT_PREFIX};
14use crate::schema::ensure_table;
15
16pub struct DynamoSnapshotStore {
17    client: Client,
18    cfg: DynamoConfig,
19}
20
21impl DynamoSnapshotStore {
22    pub async fn connect(cfg: DynamoConfig) -> Result<Arc<Self>, JournalError> {
23        let client = super_build_client(&cfg).await;
24        ensure_table(&client, &cfg).await?;
25        Ok(Arc::new(Self { client, cfg }))
26    }
27
28    pub async fn from_client(client: Client, cfg: DynamoConfig) -> Result<Arc<Self>, JournalError> {
29        ensure_table(&client, &cfg).await?;
30        Ok(Arc::new(Self { client, cfg }))
31    }
32
33    pub fn client(&self) -> &Client {
34        &self.client
35    }
36
37    pub fn config(&self) -> &DynamoConfig {
38        &self.cfg
39    }
40}
41
42async fn super_build_client(cfg: &DynamoConfig) -> Client {
43    let mut loader = aws_config::defaults(aws_config::BehaviorVersion::latest());
44    if let Some(region) = &cfg.region {
45        loader = loader.region(aws_config::Region::new(region.clone()));
46    }
47    let sdk_cfg = loader.load().await;
48    let mut builder = aws_sdk_dynamodb::config::Builder::from(&sdk_cfg);
49    if let Some(endpoint) = &cfg.endpoint_url {
50        builder = builder.endpoint_url(endpoint);
51    }
52    Client::from_conf(builder.build())
53}
54
55#[async_trait]
56impl SnapshotStore for DynamoSnapshotStore {
57    async fn save(&self, meta: SnapshotMetadata, payload: Vec<u8>) {
58        let mut item = HashMap::new();
59        item.insert("pid".into(), AttributeValue::S(meta.persistence_id.clone()));
60        item.insert("sk".into(), AttributeValue::S(snapshot_sk(meta.sequence_nr)));
61        item.insert("seq".into(), AttributeValue::N(meta.sequence_nr.to_string()));
62        item.insert("payload".into(), AttributeValue::B(Blob::new(payload)));
63        item.insert("timestamp".into(), AttributeValue::N(meta.timestamp.to_string()));
64        let _ = self.client.put_item().table_name(&self.cfg.table_name).set_item(Some(item)).send().await;
65    }
66
67    async fn load(&self, persistence_id: &str) -> Option<(SnapshotMetadata, Vec<u8>)> {
68        let out = self
69            .client
70            .query()
71            .table_name(&self.cfg.table_name)
72            .key_condition_expression("#p = :p AND begins_with(#s, :prefix)")
73            .expression_attribute_names("#p", "pid")
74            .expression_attribute_names("#s", "sk")
75            .expression_attribute_values(":p", AttributeValue::S(persistence_id.into()))
76            .expression_attribute_values(":prefix", AttributeValue::S(SNAPSHOT_PREFIX.into()))
77            .scan_index_forward(false)
78            .limit(1)
79            .send()
80            .await
81            .ok()?;
82        let item = out.items().first()?;
83        let sk = item.get("sk")?.as_s().ok()?.clone();
84        let seq = parse_sequence(&sk)?;
85        let payload = item.get("payload")?.as_b().ok()?.as_ref().to_vec();
86        let timestamp = item
87            .get("timestamp")
88            .and_then(|v| v.as_n().ok())
89            .and_then(|s| s.parse::<u64>().ok())
90            .unwrap_or(0);
91        Some((
92            SnapshotMetadata { persistence_id: persistence_id.to_string(), sequence_nr: seq, timestamp },
93            payload,
94        ))
95    }
96
97    async fn delete(&self, persistence_id: &str, to_sequence_nr: u64) {
98        for seq in 1..=to_sequence_nr {
99            let mut key = HashMap::new();
100            key.insert("pid".into(), AttributeValue::S(persistence_id.into()));
101            key.insert("sk".into(), AttributeValue::S(snapshot_sk(seq)));
102            let _ =
103                self.client.delete_item().table_name(&self.cfg.table_name).set_key(Some(key)).send().await;
104        }
105    }
106}