redis_queue_rs/
async_redis_queue.rs

1use redis::aio::MultiplexedConnection;
2use redis::AsyncCommands;
3use serde::{Deserialize, Serialize};
4
5use crate::queue_lock::async_queue_lock::AsyncQueueLock;
6use crate::queue_lock::queue_lock_builder::QueueLockBuilder;
7use crate::queue_state::queue_element::QueueElement;
8
9#[derive(Clone)]
10pub struct AsyncRedisQueue<T> {
11    queue_data_type: std::marker::PhantomData<T>,
12    
13    queue_name: String,
14    redis_connection: MultiplexedConnection,
15
16    queue_lock_builder: QueueLockBuilder,
17}
18
19impl<T> AsyncRedisQueue<T>
20where T: Clone + Serialize + for<'de> Deserialize<'de> {
21    pub async fn new(queue_name: String, redis_client: redis::Client) -> AsyncRedisQueue<T> {
22        let queue_lock_builder = QueueLockBuilder::default()
23            .with_queue_name(queue_name.clone())
24            .with_redis_client(redis_client.clone());
25        AsyncRedisQueue {
26            queue_data_type: std::marker::PhantomData,
27            queue_name,
28            queue_lock_builder,
29            redis_connection: redis_client
30                .get_multiplexed_async_connection()
31                .await
32                .unwrap(),
33        }
34    }
35
36    pub async fn push(&mut self, item: T) {
37        self.get_lock()
38            .await
39            .lock(|| async move {
40                let element = QueueElement::new(item);
41
42                self.push_element(element.clone()).await;
43
44                if self.get_first_element_id().await.is_none() {
45                    self.set_first_element_id(element.get_id()).await;
46                }
47
48                self.set_last_element_id(element.get_id()).await;
49            })
50            .await;
51    }
52
53    pub async fn pop(&mut self) -> Option<T> {
54        self.get_lock()
55            .await
56            .lock(|| async {
57                let option_element_id = self.get_first_element_id().await;
58                if option_element_id.is_none() {
59                    return None;
60                }
61
62                let element_id = option_element_id.unwrap();
63
64                let option_element = self.get_element(element_id.clone()).await;
65                if option_element.is_none() {
66                    return None;
67                }
68                let first_element = option_element.unwrap();
69
70                if first_element.get_next().is_none() {
71                    self.unset_first_element_id().await;
72                    self.unset_last_element_id().await;
73                } else {
74                    let next_element_id = first_element.get_next().unwrap();
75                    self.set_first_element_id(next_element_id).await;
76                }
77
78                self.delete_element(element_id.clone()).await;
79
80                Some(first_element.get_data())
81            })
82            .await
83    }
84
85    async fn get_element(&mut self, element_id: String) -> Option<QueueElement<T>> {
86        let element_key = format!("redis-queue:{}:element:{}", self.queue_name, element_id);
87        self.redis_connection
88            .get(element_key)
89            .await
90            .map_or(None, |data: String| serde_json::from_str(&data).ok())
91    }
92
93    async fn push_element(&mut self, element: QueueElement<T>) {
94        let element_key = format!(
95            "redis-queue:{}:element:{}",
96            self.queue_name,
97            element.get_id()
98        );
99        let element_data = serde_json::to_string(&element).unwrap();
100
101        self.redis_connection
102            .set::<String, String, String>(element_key, element_data)
103            .await
104            .unwrap();
105    }
106
107    async fn get_first_element_id(&mut self) -> Option<String> {
108        let first_element_key = format!("redis-queue:{}:state:first", self.queue_name);
109        self.redis_connection.get(first_element_key).await.ok()
110    }
111
112    async fn set_first_element_id(&mut self, element_id: String) {
113        let first_element_key = format!("redis-queue:{}:state:first", self.queue_name);
114        self.redis_connection
115            .set::<String, String, String>(first_element_key, element_id)
116            .await
117            .unwrap();
118    }
119
120    async fn unset_first_element_id(&mut self) {
121        let first_element_key = format!("redis-queue:{}:state:first", self.queue_name);
122        self.redis_connection
123            .del::<String, u8>(first_element_key)
124            .await
125            .unwrap();
126    }
127
128    async fn get_last_element_id(&mut self) -> Option<String> {
129        let last_element_key = format!("redis-queue:{}:state:last", self.queue_name);
130        self.redis_connection.get(last_element_key).await.ok()
131    }
132
133    async fn set_last_element_id(&mut self, element_id: String) {
134        let current_last_element_id = self.get_last_element_id().await;
135        if current_last_element_id.is_some() {
136            let current_last_element_id = current_last_element_id.unwrap();
137            let mut current_last_element = self
138                .get_element(current_last_element_id)
139                .await
140                .unwrap();
141            current_last_element.set_next(Some(element_id.clone()));
142            self.update_element(current_last_element).await;
143        }
144
145        let last_element_key = format!("redis-queue:{}:state:last", self.queue_name);
146        self.redis_connection
147            .set::<String, String, String>(last_element_key, element_id)
148            .await
149            .unwrap();
150    }
151
152    async fn unset_last_element_id(&mut self) {
153        let last_element_key = format!("redis-queue:{}:state:last", self.queue_name);
154        self.redis_connection
155            .del::<String, u8>(last_element_key)
156            .await
157            .unwrap();
158    }
159
160    async fn delete_element(&mut self, element_id: String) {
161        let element_key = format!("redis-queue:{}:element:{}", self.queue_name, element_id);
162        self.redis_connection
163            .del::<String, u8>(element_key)
164            .await
165            .unwrap();
166    }
167
168    async fn update_element(&mut self, element: QueueElement<T>) {
169        let element_key = format!(
170            "redis-queue:{}:element:{}",
171            self.queue_name,
172            element.get_id()
173        );
174        let element_data = serde_json::to_string(&element).unwrap();
175
176        self.redis_connection
177            .set::<String, String, String>(element_key, element_data)
178            .await
179            .unwrap();
180    }
181
182    async fn get_lock(&self) -> AsyncQueueLock {
183        self.queue_lock_builder.clone().async_build().await
184    }
185}