Skip to main content

atomr_persistence_redis/
snapshot.rs

1//! Snapshot store backed by a Redis sorted set keyed by sequence number.
2
3use std::sync::Arc;
4
5use async_trait::async_trait;
6use atomr_persistence::{JournalError, SnapshotMetadata, SnapshotStore};
7use fred::prelude::*;
8
9use crate::codec::StoredSnapshot;
10use crate::config::RedisConfig;
11
12pub struct RedisSnapshotStore {
13    client: Pool,
14    cfg: RedisConfig,
15}
16
17impl RedisSnapshotStore {
18    pub async fn connect(cfg: RedisConfig) -> Result<Arc<Self>, JournalError> {
19        let builder = Builder::from_config(Config::from_url(&cfg.url).map_err(JournalError::backend)?);
20        let pool = builder.build_pool(cfg.pool_size).map_err(JournalError::backend)?;
21        pool.init().await.map_err(JournalError::backend)?;
22        Ok(Arc::new(Self { client: pool, cfg }))
23    }
24
25    pub fn from_pool(pool: Pool, cfg: RedisConfig) -> Arc<Self> {
26        Arc::new(Self { client: pool, cfg })
27    }
28
29    pub fn config(&self) -> &RedisConfig {
30        &self.cfg
31    }
32
33    pub fn client(&self) -> &Pool {
34        &self.client
35    }
36}
37
38#[async_trait]
39impl SnapshotStore for RedisSnapshotStore {
40    async fn save(&self, meta: SnapshotMetadata, payload: Vec<u8>) {
41        let key = self.cfg.snapshot_key(&meta.persistence_id);
42        let stored = StoredSnapshot::new(&meta, &payload);
43        let raw = match serde_json::to_string(&stored) {
44            Ok(v) => v,
45            Err(e) => {
46                tracing::warn!(error = %e, "snapshot encode failed");
47                return;
48            }
49        };
50        let _: Result<(), _> =
51            self.client.zadd(&key, None, None, false, false, (meta.sequence_nr as f64, raw)).await;
52    }
53
54    async fn load(&self, persistence_id: &str) -> Option<(SnapshotMetadata, Vec<u8>)> {
55        let key = self.cfg.snapshot_key(persistence_id);
56        let res: Result<Vec<(String, f64)>, _> =
57            self.client.zrange(&key, -1, -1, None, false, None, true).await;
58        let members = res.ok()?;
59        let (raw, _) = members.into_iter().next()?;
60        let stored: StoredSnapshot = serde_json::from_str(&raw).ok()?;
61        Some(stored.into_parts())
62    }
63
64    async fn delete(&self, persistence_id: &str, to_sequence_nr: u64) {
65        let key = self.cfg.snapshot_key(persistence_id);
66        let _: Result<i64, _> = self.client.zremrangebyscore(&key, 0.0, to_sequence_nr as f64).await;
67    }
68}