redis_objects/
set.rs

1//! Storing collections of unique objects in redis
2
3use std::marker::PhantomData;
4use std::sync::atomic::AtomicI64;
5use std::sync::Arc;
6use std::time::Duration;
7
8use redis::{cmd, AsyncCommands};
9use serde::de::DeserializeOwned;
10use serde::Serialize;
11
12use crate::{RedisObjects, ErrorTypes, retry_call};
13
14const DROP_CARD_SCRIPT: &str = r#"
15local set_name = KEYS[1]
16local key = ARGV[1]
17
18redis.call('srem', set_name, key)
19return redis.call('scard', set_name)
20"#;
21
22const LIMITED_ADD: &str = r#"
23local set_name = KEYS[1]
24local key = ARGV[1]
25local limit = tonumber(ARGV[2])
26
27if redis.call('scard', set_name) < limit then
28    redis.call('sadd', set_name, key)
29    return true
30end
31return false
32"#;
33
34/// A collection of unique values in a redis object
35pub struct Set<T> {
36    name: String,
37    store: Arc<RedisObjects>,
38    drop_card_script: redis::Script,
39    limited_add: redis::Script,
40    ttl: Option<i64>,
41    last_expire_time: AtomicI64,
42    _data: PhantomData<T>
43}
44
45impl<T: Serialize + DeserializeOwned> Set<T> {
46    pub (crate) fn new(name: String, store: Arc<RedisObjects>, ttl: Option<Duration>) -> Self {
47        Self {
48            name,
49            store,
50            drop_card_script: redis::Script::new(DROP_CARD_SCRIPT),
51            limited_add: redis::Script::new(LIMITED_ADD),
52            ttl: ttl.map(|v| v.as_secs() as i64),
53            last_expire_time: AtomicI64::new(i64::MIN),
54            _data: PhantomData,
55        }
56    }
57
58    /// set expiry on the remote object if it has not been recently set
59    async fn _conditional_expire(&self) -> Result<(), ErrorTypes> {
60        if let Some(ttl) = self.ttl {
61            let ctime = chrono::Utc::now().timestamp();
62            let last_expire_time: i64 = self.last_expire_time.load(std::sync::atomic::Ordering::Acquire);
63            if ctime > last_expire_time + (ttl / 2) {
64                let _: () = retry_call!(self.store.pool, expire, &self.name, ttl)?;
65                self.last_expire_time.store(ctime, std::sync::atomic::Ordering::Release);
66            }
67        }
68        Ok(())
69    }
70
71    /// Insert an item into the set. 
72    /// Return whether the item is new to the set.
73    pub async fn add(&self, value: &T) -> Result<bool, ErrorTypes> {
74        let data = serde_json::to_vec(&value)?;
75        let result = retry_call!(self.store.pool, sadd, &self.name, &data)?;
76        self._conditional_expire().await?;
77        Ok(result)
78    }    
79
80    /// Insert a batch of items to the set.
81    /// Return how many items were new to the set.
82    pub async fn add_batch(&self, values: &[T]) -> Result<usize, ErrorTypes> {
83        let mut data = vec![];
84        for item in values {
85            data.push(serde_json::to_vec(&item)?);
86        }
87        let result = retry_call!(self.store.pool, sadd, &self.name, &data)?;
88        self._conditional_expire().await?;
89        Ok(result)
90    }
91
92    /// Add a single value to the set, but only if that wouldn't make the set grow past a given size.
93    pub async fn limited_add(&self, value: &T, size_limit: usize) -> Result<bool, ErrorTypes> {
94        let data = serde_json::to_vec(&value)?;
95        let result = retry_call!(method, self.store.pool, 
96            self.limited_add.key(&self.name).arg(&data).arg(size_limit), invoke_async)?;
97        self._conditional_expire().await?;
98        Ok(result)
99    }
100
101    /// Check if an item is in the set
102    pub async fn exist(&self, value: &T) -> Result<bool, ErrorTypes> {
103        let data = serde_json::to_vec(&value)?;
104        retry_call!(self.store.pool, sismember, &self.name, &data)
105    }
106
107    /// Read the number of items in the set
108    pub async fn length(&self) -> Result<u64, ErrorTypes> {
109        retry_call!(self.store.pool, scard, &self.name)
110    }
111
112    /// Read the entire content of the set
113    pub async fn members(&self) -> Result<Vec<T>, ErrorTypes> {
114        let data: Vec<Vec<u8>> = retry_call!(self.store.pool, smembers, &self.name)?;
115        Ok(data.into_iter()
116            .map(|v| serde_json::from_slice::<T>(&v))
117            .collect::<Result<Vec<T>, _>>()?)
118    }
119
120    /// Try to remove a given value from the set and return if any change has been made.
121    pub async fn remove(&self, value: &T) -> Result<bool, ErrorTypes> {
122        let data = serde_json::to_vec(&value)?;
123        retry_call!(self.store.pool, srem, &self.name, &data)
124    }
125
126    /// Try to remove multiple items from the set.
127    /// Return how many items were removed.
128    pub async fn remove_batch(&self, values: &[T]) -> Result<usize, ErrorTypes> {
129        let mut data = vec![];
130        for item in values {
131            data.push(serde_json::to_vec(&item)?);
132        }
133        retry_call!(self.store.pool, srem, &self.name, &data)
134    }
135
136    /// Remove a given value from the set and return the new size of the set.
137    pub async fn drop(&self, value: &T) -> Result<usize, ErrorTypes> {
138        let data = serde_json::to_vec(&value)?;
139        let size: Option<usize> = retry_call!(method, self.store.pool, 
140            self.drop_card_script.key(&self.name).arg(&data), invoke_async)?;
141        Ok(size.unwrap_or_default())
142    }
143
144    /// Remove and return a random item from the set.
145    pub async fn random(&self) -> Result<Option<T>, ErrorTypes>{
146        let ret_val: Option<Vec<u8>> = retry_call!(self.store.pool, srandmember, &self.name)?;
147        match ret_val {
148            Some(data) => Ok(Some(serde_json::from_slice(&data)?)),
149            None => Ok(None),
150        }
151    }
152
153    // pub async fn random(&self, num=None) -> Result<Option<T>, ErrorTypes>{
154    //     ret_val = retry_call(self.c.srandmember, self.name, num)
155    //     if isinstance(ret_val, list):
156    //         return [json.loads(s) for s in ret_val]
157    //     else:
158    //         return json.loads(ret_val)
159    // }
160
161    /// Remove and return a single value from the set.
162    pub async fn pop(&self) -> Result<Option<T>, ErrorTypes> {
163        let data: Option<Vec<u8>> = retry_call!(self.store.pool, spop, &self.name)?;
164        match data {
165            Some(data) => Ok(Some(serde_json::from_slice(&data)?)),
166            None => Ok(None),
167        }
168    }
169
170    /// Remove and return all values from the set.
171    pub async fn pop_all(&self) -> Result<Vec<T>, ErrorTypes> {
172        let length = self.length().await?;
173        let mut command = cmd("SPOP");
174        let command = command.arg(&self.name).arg(length);
175        let data: Vec<Vec<u8>> = retry_call!(method, self.store.pool, command, query_async)?;
176        Ok(data.into_iter()
177            .map(|v| serde_json::from_slice::<T>(&v))
178            .collect::<Result<Vec<T>, _>>()?)
179    }
180
181    /// Remove and drop all values from the set.
182    pub async fn delete(&self) -> Result<(), ErrorTypes> {
183        retry_call!(self.store.pool, del, &self.name)
184    }
185}
186