Skip to main content

mx_cache/
set.rs

1/// Minimal Redis set helper with TTL and basic bounding.
2#[derive(Clone)]
3pub struct RedisSet {
4    manager: redis::aio::ConnectionManager,
5    key: String,
6    ttl_seconds: usize,
7}
8
9impl RedisSet {
10    pub async fn new(url: &str, key: &str, ttl_seconds: u64) -> anyhow::Result<Self> {
11        Self::new_with_name(url, key, ttl_seconds, None).await
12    }
13
14    pub async fn new_with_name(
15        url: &str,
16        key: &str,
17        ttl_seconds: u64,
18        connection_name: Option<&str>,
19    ) -> anyhow::Result<Self> {
20        let client = redis::Client::open(url)?;
21        let mut manager = client.get_connection_manager().await?;
22
23        // Set connection name if provided
24        if let Some(name) = connection_name {
25            redis::cmd("CLIENT")
26                .arg("SETNAME")
27                .arg(name)
28                .query_async::<()>(&mut manager)
29                .await?;
30        }
31
32        Ok(Self {
33            manager,
34            key: key.to_owned(),
35            ttl_seconds: ttl_seconds as usize,
36        })
37    }
38
39    /// Creates a new `RedisSet` connected via Sentinel.
40    pub async fn new_sentinel(
41        sentinel_urls: &[&str],
42        master_name: &str,
43        key: &str,
44        ttl_seconds: u64,
45        connection_name: Option<&str>,
46    ) -> anyhow::Result<Self> {
47        use redis::sentinel::Sentinel;
48
49        let mut sentinel = Sentinel::build(sentinel_urls.to_vec())?;
50        let client = sentinel.async_master_for(master_name, None).await?;
51        let mut manager = client.get_connection_manager().await?;
52
53        if let Some(name) = connection_name {
54            redis::cmd("CLIENT")
55                .arg("SETNAME")
56                .arg(name)
57                .query_async::<()>(&mut manager)
58                .await?;
59        }
60
61        Ok(Self {
62            manager,
63            key: key.to_owned(),
64            ttl_seconds: ttl_seconds as usize,
65        })
66    }
67
68    /// Adds items to the set and refreshes TTL.
69    pub async fn add_items(&self, items: &[String]) -> anyhow::Result<()> {
70        if items.is_empty() {
71            return Ok(());
72        }
73        let mut conn = self.manager.clone();
74        let mut pipe = redis::pipe();
75        for item in items {
76            pipe.cmd("SADD").arg(&self.key).arg(item);
77        }
78        pipe.cmd("EXPIRE").arg(&self.key).arg(self.ttl_seconds);
79        pipe.query_async::<()>(&mut conn).await?;
80        Ok(())
81    }
82
83    /// Removes items from the set.
84    pub async fn remove_items(&self, items: &[String]) -> anyhow::Result<()> {
85        if items.is_empty() {
86            return Ok(());
87        }
88        let mut conn = self.manager.clone();
89        let mut pipe = redis::pipe();
90        for item in items {
91            pipe.cmd("SREM").arg(&self.key).arg(item);
92        }
93        pipe.query_async::<()>(&mut conn).await?;
94        Ok(())
95    }
96
97    /// Single item add.
98    pub async fn add_item(&self, item: &str) -> anyhow::Result<()> {
99        self.add_items(&[item.to_owned()]).await
100    }
101
102    /// Single item remove.
103    pub async fn remove_item(&self, item: &str) -> anyhow::Result<()> {
104        self.remove_items(&[item.to_owned()]).await
105    }
106
107    /// Returns all items in the set.
108    pub async fn load_items(&self) -> anyhow::Result<Vec<String>> {
109        let mut conn = self.manager.clone();
110        let entries: Vec<String> = redis::cmd("SMEMBERS")
111            .arg(&self.key)
112            .query_async(&mut conn)
113            .await?;
114        Ok(entries)
115    }
116
117    /// Trims the set down to `max_entries` by popping arbitrary entries when necessary.
118    pub async fn trim_to(&self, max_entries: usize) -> anyhow::Result<()> {
119        if max_entries == 0 {
120            return Ok(());
121        }
122        let mut conn = self.manager.clone();
123        let count: usize = redis::cmd("SCARD")
124            .arg(&self.key)
125            .query_async(&mut conn)
126            .await?;
127        if count > max_entries {
128            let excess = count - max_entries;
129            let _: () = redis::cmd("SPOP")
130                .arg(&self.key)
131                .arg(excess)
132                .query_async(&mut conn)
133                .await?;
134        }
135        Ok(())
136    }
137}