1use std::marker::PhantomData;
4use std::sync::atomic::AtomicI64;
5use std::sync::Arc;
6use std::time::Duration;
7
8use redis::{cmd, AsyncCommands};
9use serde::de::DeserializeOwned;
10use serde::Serialize;
11
12use crate::{RedisObjects, ErrorTypes, retry_call};
13
14const DROP_CARD_SCRIPT: &str = r#"
15local set_name = KEYS[1]
16local key = ARGV[1]
17
18redis.call('srem', set_name, key)
19return redis.call('scard', set_name)
20"#;
21
22const LIMITED_ADD: &str = r#"
23local set_name = KEYS[1]
24local key = ARGV[1]
25local limit = tonumber(ARGV[2])
26
27if redis.call('scard', set_name) < limit then
28 redis.call('sadd', set_name, key)
29 return true
30end
31return false
32"#;
33
34pub struct Set<T> {
36 name: String,
37 store: Arc<RedisObjects>,
38 drop_card_script: redis::Script,
39 limited_add: redis::Script,
40 ttl: Option<i64>,
41 last_expire_time: AtomicI64,
42 _data: PhantomData<T>
43}
44
45impl<T: Serialize + DeserializeOwned> Set<T> {
46 pub (crate) fn new(name: String, store: Arc<RedisObjects>, ttl: Option<Duration>) -> Self {
47 Self {
48 name,
49 store,
50 drop_card_script: redis::Script::new(DROP_CARD_SCRIPT),
51 limited_add: redis::Script::new(LIMITED_ADD),
52 ttl: ttl.map(|v| v.as_secs() as i64),
53 last_expire_time: AtomicI64::new(i64::MIN),
54 _data: PhantomData,
55 }
56 }
57
58 async fn _conditional_expire(&self) -> Result<(), ErrorTypes> {
60 if let Some(ttl) = self.ttl {
61 let ctime = chrono::Utc::now().timestamp();
62 let last_expire_time: i64 = self.last_expire_time.load(std::sync::atomic::Ordering::Acquire);
63 if ctime > last_expire_time + (ttl / 2) {
64 let _: () = retry_call!(self.store.pool, expire, &self.name, ttl)?;
65 self.last_expire_time.store(ctime, std::sync::atomic::Ordering::Release);
66 }
67 }
68 Ok(())
69 }
70
71 pub async fn add(&self, value: &T) -> Result<bool, ErrorTypes> {
74 let data = serde_json::to_vec(&value)?;
75 let result = retry_call!(self.store.pool, sadd, &self.name, &data)?;
76 self._conditional_expire().await?;
77 Ok(result)
78 }
79
80 pub async fn add_batch(&self, values: &[T]) -> Result<usize, ErrorTypes> {
83 let mut data = vec![];
84 for item in values {
85 data.push(serde_json::to_vec(&item)?);
86 }
87 let result = retry_call!(self.store.pool, sadd, &self.name, &data)?;
88 self._conditional_expire().await?;
89 Ok(result)
90 }
91
92 pub async fn limited_add(&self, value: &T, size_limit: usize) -> Result<bool, ErrorTypes> {
94 let data = serde_json::to_vec(&value)?;
95 let result = retry_call!(method, self.store.pool,
96 self.limited_add.key(&self.name).arg(&data).arg(size_limit), invoke_async)?;
97 self._conditional_expire().await?;
98 Ok(result)
99 }
100
101 pub async fn exist(&self, value: &T) -> Result<bool, ErrorTypes> {
103 let data = serde_json::to_vec(&value)?;
104 retry_call!(self.store.pool, sismember, &self.name, &data)
105 }
106
107 pub async fn length(&self) -> Result<u64, ErrorTypes> {
109 retry_call!(self.store.pool, scard, &self.name)
110 }
111
112 pub async fn members(&self) -> Result<Vec<T>, ErrorTypes> {
114 let data: Vec<Vec<u8>> = retry_call!(self.store.pool, smembers, &self.name)?;
115 Ok(data.into_iter()
116 .map(|v| serde_json::from_slice::<T>(&v))
117 .collect::<Result<Vec<T>, _>>()?)
118 }
119
120 pub async fn remove(&self, value: &T) -> Result<bool, ErrorTypes> {
122 let data = serde_json::to_vec(&value)?;
123 retry_call!(self.store.pool, srem, &self.name, &data)
124 }
125
126 pub async fn remove_batch(&self, values: &[T]) -> Result<usize, ErrorTypes> {
129 let mut data = vec![];
130 for item in values {
131 data.push(serde_json::to_vec(&item)?);
132 }
133 retry_call!(self.store.pool, srem, &self.name, &data)
134 }
135
136 pub async fn drop(&self, value: &T) -> Result<usize, ErrorTypes> {
138 let data = serde_json::to_vec(&value)?;
139 let size: Option<usize> = retry_call!(method, self.store.pool,
140 self.drop_card_script.key(&self.name).arg(&data), invoke_async)?;
141 Ok(size.unwrap_or_default())
142 }
143
144 pub async fn random(&self) -> Result<Option<T>, ErrorTypes>{
146 let ret_val: Option<Vec<u8>> = retry_call!(self.store.pool, srandmember, &self.name)?;
147 match ret_val {
148 Some(data) => Ok(Some(serde_json::from_slice(&data)?)),
149 None => Ok(None),
150 }
151 }
152
153 pub async fn pop(&self) -> Result<Option<T>, ErrorTypes> {
163 let data: Option<Vec<u8>> = retry_call!(self.store.pool, spop, &self.name)?;
164 match data {
165 Some(data) => Ok(Some(serde_json::from_slice(&data)?)),
166 None => Ok(None),
167 }
168 }
169
170 pub async fn pop_all(&self) -> Result<Vec<T>, ErrorTypes> {
172 let length = self.length().await?;
173 let mut command = cmd("SPOP");
174 let command = command.arg(&self.name).arg(length);
175 let data: Vec<Vec<u8>> = retry_call!(method, self.store.pool, command, query_async)?;
176 Ok(data.into_iter()
177 .map(|v| serde_json::from_slice::<T>(&v))
178 .collect::<Result<Vec<T>, _>>()?)
179 }
180
181 pub async fn delete(&self) -> Result<(), ErrorTypes> {
183 retry_call!(self.store.pool, del, &self.name)
184 }
185}
186