Skip to main content

atomr_persistence_azure/
snapshot.rs

1//! Azure Table Storage `SnapshotStore`.
2
3use std::sync::Arc;
4
5use async_trait::async_trait;
6use atomr_persistence::{JournalError, SnapshotMetadata, SnapshotStore};
7
8use crate::config::AzureConfig;
9use crate::entities::SnapshotEntity;
10use crate::rest::TableClient;
11
12pub struct AzureSnapshotStore {
13    client: TableClient,
14    cfg: AzureConfig,
15}
16
17impl AzureSnapshotStore {
18    pub async fn connect(cfg: AzureConfig) -> Result<Arc<Self>, JournalError> {
19        let client = TableClient::new(&cfg.endpoint, &cfg.account, &cfg.key)?;
20        if cfg.auto_create_tables {
21            client.create_table_if_absent(&cfg.snapshot_table).await?;
22        }
23        Ok(Arc::new(Self { client, cfg }))
24    }
25
26    pub fn config(&self) -> &AzureConfig {
27        &self.cfg
28    }
29}
30
31fn escape_pk(pid: &str) -> String {
32    pid.replace('\'', "''")
33}
34
35#[async_trait]
36impl SnapshotStore for AzureSnapshotStore {
37    async fn save(&self, meta: SnapshotMetadata, payload: Vec<u8>) {
38        let entity = SnapshotEntity::from_meta(&meta, &payload);
39        let _ = self
40            .client
41            .upsert_entity(
42                &self.cfg.snapshot_table,
43                &entity.partition_key.clone(),
44                &entity.row_key.clone(),
45                &entity,
46            )
47            .await;
48    }
49
50    async fn load(&self, persistence_id: &str) -> Option<(SnapshotMetadata, Vec<u8>)> {
51        let pk = escape_pk(persistence_id);
52        let filter = format!("PartitionKey eq '{pk}'");
53        let mut entities: Vec<SnapshotEntity> =
54            self.client.query_entities(&self.cfg.snapshot_table, &filter, None).await.ok()?;
55        entities.sort_by_key(|e| std::cmp::Reverse(e.sequence_nr));
56        let entity = entities.into_iter().next()?;
57        Some(entity.into_parts())
58    }
59
60    async fn delete(&self, persistence_id: &str, to_sequence_nr: u64) {
61        let pk = escape_pk(persistence_id);
62        let filter = format!("PartitionKey eq '{pk}' and SequenceNr le {to}", to = to_sequence_nr as i64,);
63        let entities: Vec<SnapshotEntity> =
64            match self.client.query_entities(&self.cfg.snapshot_table, &filter, None).await {
65                Ok(e) => e,
66                Err(_) => return,
67            };
68        for entity in entities {
69            let _ = self
70                .client
71                .delete_entity(&self.cfg.snapshot_table, &entity.partition_key, &entity.row_key)
72                .await;
73        }
74    }
75}