atomr_persistence_azure/
snapshot.rs1use std::sync::Arc;
4
5use async_trait::async_trait;
6use atomr_persistence::{JournalError, SnapshotMetadata, SnapshotStore};
7
8use crate::config::AzureConfig;
9use crate::entities::SnapshotEntity;
10use crate::rest::TableClient;
11
12pub struct AzureSnapshotStore {
13 client: TableClient,
14 cfg: AzureConfig,
15}
16
17impl AzureSnapshotStore {
18 pub async fn connect(cfg: AzureConfig) -> Result<Arc<Self>, JournalError> {
19 let client = TableClient::new(&cfg.endpoint, &cfg.account, &cfg.key)?;
20 if cfg.auto_create_tables {
21 client.create_table_if_absent(&cfg.snapshot_table).await?;
22 }
23 Ok(Arc::new(Self { client, cfg }))
24 }
25
26 pub fn config(&self) -> &AzureConfig {
27 &self.cfg
28 }
29}
30
31fn escape_pk(pid: &str) -> String {
32 pid.replace('\'', "''")
33}
34
35#[async_trait]
36impl SnapshotStore for AzureSnapshotStore {
37 async fn save(&self, meta: SnapshotMetadata, payload: Vec<u8>) {
38 let entity = SnapshotEntity::from_meta(&meta, &payload);
39 let _ = self
40 .client
41 .upsert_entity(
42 &self.cfg.snapshot_table,
43 &entity.partition_key.clone(),
44 &entity.row_key.clone(),
45 &entity,
46 )
47 .await;
48 }
49
50 async fn load(&self, persistence_id: &str) -> Option<(SnapshotMetadata, Vec<u8>)> {
51 let pk = escape_pk(persistence_id);
52 let filter = format!("PartitionKey eq '{pk}'");
53 let mut entities: Vec<SnapshotEntity> =
54 self.client.query_entities(&self.cfg.snapshot_table, &filter, None).await.ok()?;
55 entities.sort_by_key(|e| std::cmp::Reverse(e.sequence_nr));
56 let entity = entities.into_iter().next()?;
57 Some(entity.into_parts())
58 }
59
60 async fn delete(&self, persistence_id: &str, to_sequence_nr: u64) {
61 let pk = escape_pk(persistence_id);
62 let filter = format!("PartitionKey eq '{pk}' and SequenceNr le {to}", to = to_sequence_nr as i64,);
63 let entities: Vec<SnapshotEntity> =
64 match self.client.query_entities(&self.cfg.snapshot_table, &filter, None).await {
65 Ok(e) => e,
66 Err(_) => return,
67 };
68 for entity in entities {
69 let _ = self
70 .client
71 .delete_entity(&self.cfg.snapshot_table, &entity.partition_key, &entity.row_key)
72 .await;
73 }
74 }
75}