atomr_persistence_redis/
snapshot.rs1use std::sync::Arc;
4
5use async_trait::async_trait;
6use atomr_persistence::{JournalError, SnapshotMetadata, SnapshotStore};
7use fred::prelude::*;
8
9use crate::codec::StoredSnapshot;
10use crate::config::RedisConfig;
11
12pub struct RedisSnapshotStore {
13 client: Pool,
14 cfg: RedisConfig,
15}
16
17impl RedisSnapshotStore {
18 pub async fn connect(cfg: RedisConfig) -> Result<Arc<Self>, JournalError> {
19 let builder = Builder::from_config(Config::from_url(&cfg.url).map_err(JournalError::backend)?);
20 let pool = builder.build_pool(cfg.pool_size).map_err(JournalError::backend)?;
21 pool.init().await.map_err(JournalError::backend)?;
22 Ok(Arc::new(Self { client: pool, cfg }))
23 }
24
25 pub fn from_pool(pool: Pool, cfg: RedisConfig) -> Arc<Self> {
26 Arc::new(Self { client: pool, cfg })
27 }
28
29 pub fn config(&self) -> &RedisConfig {
30 &self.cfg
31 }
32
33 pub fn client(&self) -> &Pool {
34 &self.client
35 }
36}
37
38#[async_trait]
39impl SnapshotStore for RedisSnapshotStore {
40 async fn save(&self, meta: SnapshotMetadata, payload: Vec<u8>) {
41 let key = self.cfg.snapshot_key(&meta.persistence_id);
42 let stored = StoredSnapshot::new(&meta, &payload);
43 let raw = match serde_json::to_string(&stored) {
44 Ok(v) => v,
45 Err(e) => {
46 tracing::warn!(error = %e, "snapshot encode failed");
47 return;
48 }
49 };
50 let _: Result<(), _> =
51 self.client.zadd(&key, None, None, false, false, (meta.sequence_nr as f64, raw)).await;
52 }
53
54 async fn load(&self, persistence_id: &str) -> Option<(SnapshotMetadata, Vec<u8>)> {
55 let key = self.cfg.snapshot_key(persistence_id);
56 let res: Result<Vec<(String, f64)>, _> =
57 self.client.zrange(&key, -1, -1, None, false, None, true).await;
58 let members = res.ok()?;
59 let (raw, _) = members.into_iter().next()?;
60 let stored: StoredSnapshot = serde_json::from_str(&raw).ok()?;
61 Some(stored.into_parts())
62 }
63
64 async fn delete(&self, persistence_id: &str, to_sequence_nr: u64) {
65 let key = self.cfg.snapshot_key(persistence_id);
66 let _: Result<i64, _> = self.client.zremrangebyscore(&key, 0.0, to_sequence_nr as f64).await;
67 }
68}