atomr_persistence_aws/
snapshot.rs1use std::collections::HashMap;
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use atomr_persistence::{JournalError, SnapshotMetadata, SnapshotStore};
8use aws_sdk_dynamodb::primitives::Blob;
9use aws_sdk_dynamodb::types::AttributeValue;
10use aws_sdk_dynamodb::Client;
11
12use crate::config::DynamoConfig;
13use crate::keys::{parse_sequence, snapshot_sk, SNAPSHOT_PREFIX};
14use crate::schema::ensure_table;
15
16pub struct DynamoSnapshotStore {
17 client: Client,
18 cfg: DynamoConfig,
19}
20
21impl DynamoSnapshotStore {
22 pub async fn connect(cfg: DynamoConfig) -> Result<Arc<Self>, JournalError> {
23 let client = super_build_client(&cfg).await;
24 ensure_table(&client, &cfg).await?;
25 Ok(Arc::new(Self { client, cfg }))
26 }
27
28 pub async fn from_client(client: Client, cfg: DynamoConfig) -> Result<Arc<Self>, JournalError> {
29 ensure_table(&client, &cfg).await?;
30 Ok(Arc::new(Self { client, cfg }))
31 }
32
33 pub fn client(&self) -> &Client {
34 &self.client
35 }
36
37 pub fn config(&self) -> &DynamoConfig {
38 &self.cfg
39 }
40}
41
42async fn super_build_client(cfg: &DynamoConfig) -> Client {
43 let mut loader = aws_config::defaults(aws_config::BehaviorVersion::latest());
44 if let Some(region) = &cfg.region {
45 loader = loader.region(aws_config::Region::new(region.clone()));
46 }
47 let sdk_cfg = loader.load().await;
48 let mut builder = aws_sdk_dynamodb::config::Builder::from(&sdk_cfg);
49 if let Some(endpoint) = &cfg.endpoint_url {
50 builder = builder.endpoint_url(endpoint);
51 }
52 Client::from_conf(builder.build())
53}
54
55#[async_trait]
56impl SnapshotStore for DynamoSnapshotStore {
57 async fn save(&self, meta: SnapshotMetadata, payload: Vec<u8>) {
58 let mut item = HashMap::new();
59 item.insert("pid".into(), AttributeValue::S(meta.persistence_id.clone()));
60 item.insert("sk".into(), AttributeValue::S(snapshot_sk(meta.sequence_nr)));
61 item.insert("seq".into(), AttributeValue::N(meta.sequence_nr.to_string()));
62 item.insert("payload".into(), AttributeValue::B(Blob::new(payload)));
63 item.insert("timestamp".into(), AttributeValue::N(meta.timestamp.to_string()));
64 let _ = self.client.put_item().table_name(&self.cfg.table_name).set_item(Some(item)).send().await;
65 }
66
67 async fn load(&self, persistence_id: &str) -> Option<(SnapshotMetadata, Vec<u8>)> {
68 let out = self
69 .client
70 .query()
71 .table_name(&self.cfg.table_name)
72 .key_condition_expression("#p = :p AND begins_with(#s, :prefix)")
73 .expression_attribute_names("#p", "pid")
74 .expression_attribute_names("#s", "sk")
75 .expression_attribute_values(":p", AttributeValue::S(persistence_id.into()))
76 .expression_attribute_values(":prefix", AttributeValue::S(SNAPSHOT_PREFIX.into()))
77 .scan_index_forward(false)
78 .limit(1)
79 .send()
80 .await
81 .ok()?;
82 let item = out.items().first()?;
83 let sk = item.get("sk")?.as_s().ok()?.clone();
84 let seq = parse_sequence(&sk)?;
85 let payload = item.get("payload")?.as_b().ok()?.as_ref().to_vec();
86 let timestamp = item
87 .get("timestamp")
88 .and_then(|v| v.as_n().ok())
89 .and_then(|s| s.parse::<u64>().ok())
90 .unwrap_or(0);
91 Some((
92 SnapshotMetadata { persistence_id: persistence_id.to_string(), sequence_nr: seq, timestamp },
93 payload,
94 ))
95 }
96
97 async fn delete(&self, persistence_id: &str, to_sequence_nr: u64) {
98 for seq in 1..=to_sequence_nr {
99 let mut key = HashMap::new();
100 key.insert("pid".into(), AttributeValue::S(persistence_id.into()));
101 key.insert("sk".into(), AttributeValue::S(snapshot_sk(seq)));
102 let _ =
103 self.client.delete_item().table_name(&self.cfg.table_name).set_key(Some(key)).send().await;
104 }
105 }
106}