redis_objects/
set.rs

1//! Storing collections of unique objects in redis
2
3use std::marker::PhantomData;
4use std::sync::atomic::AtomicI64;
5use std::sync::Arc;
6use std::time::Duration;
7
8use redis::{cmd, AsyncCommands};
9use serde::de::DeserializeOwned;
10use serde::Serialize;
11use tracing::instrument;
12
13use crate::{RedisObjects, ErrorTypes, retry_call};
14
15const DROP_CARD_SCRIPT: &str = r#"
16local set_name = KEYS[1]
17local key = ARGV[1]
18
19redis.call('srem', set_name, key)
20return redis.call('scard', set_name)
21"#;
22
23const LIMITED_ADD: &str = r#"
24local set_name = KEYS[1]
25local key = ARGV[1]
26local limit = tonumber(ARGV[2])
27
28if redis.call('scard', set_name) < limit then
29    redis.call('sadd', set_name, key)
30    return true
31end
32return false
33"#;
34
35/// A collection of unique values in a redis object
36pub struct Set<T> {
37    name: String,
38    store: Arc<RedisObjects>,
39    drop_card_script: redis::Script,
40    limited_add: redis::Script,
41    ttl: Option<i64>,
42    last_expire_time: AtomicI64,
43    _data: PhantomData<T>
44}
45
46impl<T> std::fmt::Debug for Set<T> {
47    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48        f.debug_struct("Set").field("name", &self.name).field("store", &self.store).finish()
49    }
50}
51
52impl<T: Serialize + DeserializeOwned> Set<T> {
53    pub (crate) fn new(name: String, store: Arc<RedisObjects>, ttl: Option<Duration>) -> Self {
54        Self {
55            name,
56            store,
57            drop_card_script: redis::Script::new(DROP_CARD_SCRIPT),
58            limited_add: redis::Script::new(LIMITED_ADD),
59            ttl: ttl.map(|v| v.as_secs() as i64),
60            last_expire_time: AtomicI64::new(i64::MIN),
61            _data: PhantomData,
62        }
63    }
64
65    /// set expiry on the remote object if it has not been recently set
66    async fn _conditional_expire(&self) -> Result<(), ErrorTypes> {
67        if let Some(ttl) = self.ttl {
68            let ctime = chrono::Utc::now().timestamp();
69            let last_expire_time: i64 = self.last_expire_time.load(std::sync::atomic::Ordering::Acquire);
70            if ctime > last_expire_time + (ttl / 2) {
71                let _: () = retry_call!(self.store.pool, expire, &self.name, ttl)?;
72                self.last_expire_time.store(ctime, std::sync::atomic::Ordering::Release);
73            }
74        }
75        Ok(())
76    }
77
78    /// Insert an item into the set. 
79    /// Return whether the item is new to the set.
80    #[instrument(skip(value))]
81    pub async fn add(&self, value: &T) -> Result<bool, ErrorTypes> {
82        let data = serde_json::to_vec(&value)?;
83        let result = retry_call!(self.store.pool, sadd, &self.name, &data)?;
84        self._conditional_expire().await?;
85        Ok(result)
86    }    
87
88    /// Insert a batch of items to the set.
89    /// Return how many items were new to the set.
90    #[instrument(skip(values))]
91    pub async fn add_batch(&self, values: &[T]) -> Result<usize, ErrorTypes> {
92        let mut data = vec![];
93        for item in values {
94            data.push(serde_json::to_vec(&item)?);
95        }
96        let result = retry_call!(self.store.pool, sadd, &self.name, &data)?;
97        self._conditional_expire().await?;
98        Ok(result)
99    }
100
101    /// Add a single value to the set, but only if that wouldn't make the set grow past a given size.
102    #[instrument(skip(value))]
103    pub async fn limited_add(&self, value: &T, size_limit: usize) -> Result<bool, ErrorTypes> {
104        let data = serde_json::to_vec(&value)?;
105        let result = retry_call!(method, self.store.pool, 
106            self.limited_add.key(&self.name).arg(&data).arg(size_limit), invoke_async)?;
107        self._conditional_expire().await?;
108        Ok(result)
109    }
110
111    /// Check if an item is in the set
112    #[instrument(skip(value))]
113    pub async fn exist(&self, value: &T) -> Result<bool, ErrorTypes> {
114        let data = serde_json::to_vec(&value)?;
115        retry_call!(self.store.pool, sismember, &self.name, &data)
116    }
117
118    /// Read the number of items in the set
119    #[instrument]
120    pub async fn length(&self) -> Result<u64, ErrorTypes> {
121        retry_call!(self.store.pool, scard, &self.name)
122    }
123
124    /// Read the entire content of the set
125    #[instrument]
126    pub async fn members(&self) -> Result<Vec<T>, ErrorTypes> {
127        let data: Vec<Vec<u8>> = retry_call!(self.store.pool, smembers, &self.name)?;
128        Ok(data.into_iter()
129            .map(|v| serde_json::from_slice::<T>(&v))
130            .collect::<Result<Vec<T>, _>>()?)
131    }
132
133    /// Try to remove a given value from the set and return if any change has been made.
134    #[instrument(skip(value))]
135    pub async fn remove(&self, value: &T) -> Result<bool, ErrorTypes> {
136        let data = serde_json::to_vec(&value)?;
137        retry_call!(self.store.pool, srem, &self.name, &data)
138    }
139
140    /// Try to remove multiple items from the set.
141    /// Return how many items were removed.
142    #[instrument(skip(values))]
143    pub async fn remove_batch(&self, values: &[T]) -> Result<usize, ErrorTypes> {
144        let mut data = vec![];
145        for item in values {
146            data.push(serde_json::to_vec(&item)?);
147        }
148        retry_call!(self.store.pool, srem, &self.name, &data)
149    }
150
151    /// Remove a given value from the set and return the new size of the set.
152    #[instrument(skip(value))]
153    pub async fn drop(&self, value: &T) -> Result<usize, ErrorTypes> {
154        let data = serde_json::to_vec(&value)?;
155        let size: Option<usize> = retry_call!(method, self.store.pool, 
156            self.drop_card_script.key(&self.name).arg(&data), invoke_async)?;
157        Ok(size.unwrap_or_default())
158    }
159
160    /// Remove and return a random item from the set.
161    #[instrument]
162    pub async fn random(&self) -> Result<Option<T>, ErrorTypes>{
163        let ret_val: Option<Vec<u8>> = retry_call!(self.store.pool, srandmember, &self.name)?;
164        match ret_val {
165            Some(data) => Ok(Some(serde_json::from_slice(&data)?)),
166            None => Ok(None),
167        }
168    }
169
170    // pub async fn random(&self, num=None) -> Result<Option<T>, ErrorTypes>{
171    //     ret_val = retry_call(self.c.srandmember, self.name, num)
172    //     if isinstance(ret_val, list):
173    //         return [json.loads(s) for s in ret_val]
174    //     else:
175    //         return json.loads(ret_val)
176    // }
177
178    /// Remove and return a single value from the set.
179    #[instrument]
180    pub async fn pop(&self) -> Result<Option<T>, ErrorTypes> {
181        let data: Option<Vec<u8>> = retry_call!(self.store.pool, spop, &self.name)?;
182        match data {
183            Some(data) => Ok(Some(serde_json::from_slice(&data)?)),
184            None => Ok(None),
185        }
186    }
187
188    /// Remove and return all values from the set.
189    #[instrument]
190    pub async fn pop_all(&self) -> Result<Vec<T>, ErrorTypes> {
191        let length = self.length().await?;
192        let mut command = cmd("SPOP");
193        let command = command.arg(&self.name).arg(length);
194        let data: Vec<Vec<u8>> = retry_call!(method, self.store.pool, command, query_async)?;
195        Ok(data.into_iter()
196            .map(|v| serde_json::from_slice::<T>(&v))
197            .collect::<Result<Vec<T>, _>>()?)
198    }
199
200    /// Remove and drop all values from the set.
201    #[instrument]
202    pub async fn delete(&self) -> Result<(), ErrorTypes> {
203        retry_call!(self.store.pool, del, &self.name)
204    }
205}
206