1#[derive(Clone)]
3pub struct RedisSet {
4 manager: redis::aio::ConnectionManager,
5 key: String,
6 ttl_seconds: usize,
7}
8
9impl RedisSet {
10 pub async fn new(url: &str, key: &str, ttl_seconds: u64) -> anyhow::Result<Self> {
11 Self::new_with_name(url, key, ttl_seconds, None).await
12 }
13
14 pub async fn new_with_name(
15 url: &str,
16 key: &str,
17 ttl_seconds: u64,
18 connection_name: Option<&str>,
19 ) -> anyhow::Result<Self> {
20 let client = redis::Client::open(url)?;
21 let mut manager = client.get_connection_manager().await?;
22
23 if let Some(name) = connection_name {
25 redis::cmd("CLIENT")
26 .arg("SETNAME")
27 .arg(name)
28 .query_async::<()>(&mut manager)
29 .await?;
30 }
31
32 Ok(Self {
33 manager,
34 key: key.to_owned(),
35 ttl_seconds: ttl_seconds as usize,
36 })
37 }
38
39 pub async fn new_sentinel(
41 sentinel_urls: &[&str],
42 master_name: &str,
43 key: &str,
44 ttl_seconds: u64,
45 connection_name: Option<&str>,
46 ) -> anyhow::Result<Self> {
47 use redis::sentinel::Sentinel;
48
49 let mut sentinel = Sentinel::build(sentinel_urls.to_vec())?;
50 let client = sentinel.async_master_for(master_name, None).await?;
51 let mut manager = client.get_connection_manager().await?;
52
53 if let Some(name) = connection_name {
54 redis::cmd("CLIENT")
55 .arg("SETNAME")
56 .arg(name)
57 .query_async::<()>(&mut manager)
58 .await?;
59 }
60
61 Ok(Self {
62 manager,
63 key: key.to_owned(),
64 ttl_seconds: ttl_seconds as usize,
65 })
66 }
67
68 pub async fn add_items(&self, items: &[String]) -> anyhow::Result<()> {
70 if items.is_empty() {
71 return Ok(());
72 }
73 let mut conn = self.manager.clone();
74 let mut pipe = redis::pipe();
75 for item in items {
76 pipe.cmd("SADD").arg(&self.key).arg(item);
77 }
78 pipe.cmd("EXPIRE").arg(&self.key).arg(self.ttl_seconds);
79 pipe.query_async::<()>(&mut conn).await?;
80 Ok(())
81 }
82
83 pub async fn remove_items(&self, items: &[String]) -> anyhow::Result<()> {
85 if items.is_empty() {
86 return Ok(());
87 }
88 let mut conn = self.manager.clone();
89 let mut pipe = redis::pipe();
90 for item in items {
91 pipe.cmd("SREM").arg(&self.key).arg(item);
92 }
93 pipe.query_async::<()>(&mut conn).await?;
94 Ok(())
95 }
96
97 pub async fn add_item(&self, item: &str) -> anyhow::Result<()> {
99 self.add_items(&[item.to_owned()]).await
100 }
101
102 pub async fn remove_item(&self, item: &str) -> anyhow::Result<()> {
104 self.remove_items(&[item.to_owned()]).await
105 }
106
107 pub async fn load_items(&self) -> anyhow::Result<Vec<String>> {
109 let mut conn = self.manager.clone();
110 let entries: Vec<String> = redis::cmd("SMEMBERS")
111 .arg(&self.key)
112 .query_async(&mut conn)
113 .await?;
114 Ok(entries)
115 }
116
117 pub async fn trim_to(&self, max_entries: usize) -> anyhow::Result<()> {
119 if max_entries == 0 {
120 return Ok(());
121 }
122 let mut conn = self.manager.clone();
123 let count: usize = redis::cmd("SCARD")
124 .arg(&self.key)
125 .query_async(&mut conn)
126 .await?;
127 if count > max_entries {
128 let excess = count - max_entries;
129 let _: () = redis::cmd("SPOP")
130 .arg(&self.key)
131 .arg(excess)
132 .query_async(&mut conn)
133 .await?;
134 }
135 Ok(())
136 }
137}