1use std::marker::PhantomData;
4use std::sync::atomic::AtomicI64;
5use std::sync::Arc;
6use std::time::Duration;
7
8use redis::{cmd, AsyncCommands};
9use serde::de::DeserializeOwned;
10use serde::Serialize;
11use tracing::instrument;
12
13use crate::{RedisObjects, ErrorTypes, retry_call};
14
15const DROP_CARD_SCRIPT: &str = r#"
16local set_name = KEYS[1]
17local key = ARGV[1]
18
19redis.call('srem', set_name, key)
20return redis.call('scard', set_name)
21"#;
22
23const LIMITED_ADD: &str = r#"
24local set_name = KEYS[1]
25local key = ARGV[1]
26local limit = tonumber(ARGV[2])
27
28if redis.call('scard', set_name) < limit then
29 redis.call('sadd', set_name, key)
30 return true
31end
32return false
33"#;
34
35pub struct Set<T> {
37 name: String,
38 store: Arc<RedisObjects>,
39 drop_card_script: redis::Script,
40 limited_add: redis::Script,
41 ttl: Option<i64>,
42 last_expire_time: AtomicI64,
43 _data: PhantomData<T>
44}
45
46impl<T> std::fmt::Debug for Set<T> {
47 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48 f.debug_struct("Set").field("name", &self.name).field("store", &self.store).finish()
49 }
50}
51
52impl<T: Serialize + DeserializeOwned> Set<T> {
53 pub (crate) fn new(name: String, store: Arc<RedisObjects>, ttl: Option<Duration>) -> Self {
54 Self {
55 name,
56 store,
57 drop_card_script: redis::Script::new(DROP_CARD_SCRIPT),
58 limited_add: redis::Script::new(LIMITED_ADD),
59 ttl: ttl.map(|v| v.as_secs() as i64),
60 last_expire_time: AtomicI64::new(i64::MIN),
61 _data: PhantomData,
62 }
63 }
64
65 async fn _conditional_expire(&self) -> Result<(), ErrorTypes> {
67 if let Some(ttl) = self.ttl {
68 let ctime = chrono::Utc::now().timestamp();
69 let last_expire_time: i64 = self.last_expire_time.load(std::sync::atomic::Ordering::Acquire);
70 if ctime > last_expire_time + (ttl / 2) {
71 let _: () = retry_call!(self.store.pool, expire, &self.name, ttl)?;
72 self.last_expire_time.store(ctime, std::sync::atomic::Ordering::Release);
73 }
74 }
75 Ok(())
76 }
77
78 #[instrument(skip(value))]
81 pub async fn add(&self, value: &T) -> Result<bool, ErrorTypes> {
82 let data = serde_json::to_vec(&value)?;
83 let result = retry_call!(self.store.pool, sadd, &self.name, &data)?;
84 self._conditional_expire().await?;
85 Ok(result)
86 }
87
88 #[instrument(skip(values))]
91 pub async fn add_batch(&self, values: &[T]) -> Result<usize, ErrorTypes> {
92 let mut data = vec![];
93 for item in values {
94 data.push(serde_json::to_vec(&item)?);
95 }
96 let result = retry_call!(self.store.pool, sadd, &self.name, &data)?;
97 self._conditional_expire().await?;
98 Ok(result)
99 }
100
101 #[instrument(skip(value))]
103 pub async fn limited_add(&self, value: &T, size_limit: usize) -> Result<bool, ErrorTypes> {
104 let data = serde_json::to_vec(&value)?;
105 let result = retry_call!(method, self.store.pool,
106 self.limited_add.key(&self.name).arg(&data).arg(size_limit), invoke_async)?;
107 self._conditional_expire().await?;
108 Ok(result)
109 }
110
111 #[instrument(skip(value))]
113 pub async fn exist(&self, value: &T) -> Result<bool, ErrorTypes> {
114 let data = serde_json::to_vec(&value)?;
115 retry_call!(self.store.pool, sismember, &self.name, &data)
116 }
117
118 #[instrument]
120 pub async fn length(&self) -> Result<u64, ErrorTypes> {
121 retry_call!(self.store.pool, scard, &self.name)
122 }
123
124 #[instrument]
126 pub async fn members(&self) -> Result<Vec<T>, ErrorTypes> {
127 let data: Vec<Vec<u8>> = retry_call!(self.store.pool, smembers, &self.name)?;
128 Ok(data.into_iter()
129 .map(|v| serde_json::from_slice::<T>(&v))
130 .collect::<Result<Vec<T>, _>>()?)
131 }
132
133 #[instrument(skip(value))]
135 pub async fn remove(&self, value: &T) -> Result<bool, ErrorTypes> {
136 let data = serde_json::to_vec(&value)?;
137 retry_call!(self.store.pool, srem, &self.name, &data)
138 }
139
140 #[instrument(skip(values))]
143 pub async fn remove_batch(&self, values: &[T]) -> Result<usize, ErrorTypes> {
144 let mut data = vec![];
145 for item in values {
146 data.push(serde_json::to_vec(&item)?);
147 }
148 retry_call!(self.store.pool, srem, &self.name, &data)
149 }
150
151 #[instrument(skip(value))]
153 pub async fn drop(&self, value: &T) -> Result<usize, ErrorTypes> {
154 let data = serde_json::to_vec(&value)?;
155 let size: Option<usize> = retry_call!(method, self.store.pool,
156 self.drop_card_script.key(&self.name).arg(&data), invoke_async)?;
157 Ok(size.unwrap_or_default())
158 }
159
160 #[instrument]
162 pub async fn random(&self) -> Result<Option<T>, ErrorTypes>{
163 let ret_val: Option<Vec<u8>> = retry_call!(self.store.pool, srandmember, &self.name)?;
164 match ret_val {
165 Some(data) => Ok(Some(serde_json::from_slice(&data)?)),
166 None => Ok(None),
167 }
168 }
169
170 #[instrument]
180 pub async fn pop(&self) -> Result<Option<T>, ErrorTypes> {
181 let data: Option<Vec<u8>> = retry_call!(self.store.pool, spop, &self.name)?;
182 match data {
183 Some(data) => Ok(Some(serde_json::from_slice(&data)?)),
184 None => Ok(None),
185 }
186 }
187
188 #[instrument]
190 pub async fn pop_all(&self) -> Result<Vec<T>, ErrorTypes> {
191 let length = self.length().await?;
192 let mut command = cmd("SPOP");
193 let command = command.arg(&self.name).arg(length);
194 let data: Vec<Vec<u8>> = retry_call!(method, self.store.pool, command, query_async)?;
195 Ok(data.into_iter()
196 .map(|v| serde_json::from_slice::<T>(&v))
197 .collect::<Result<Vec<T>, _>>()?)
198 }
199
200 #[instrument]
202 pub async fn delete(&self) -> Result<(), ErrorTypes> {
203 retry_call!(self.store.pool, del, &self.name)
204 }
205}
206