redis_queue_rs/
async_redis_queue.rs1use redis::aio::MultiplexedConnection;
2use redis::AsyncCommands;
3use serde::{Deserialize, Serialize};
4
5use crate::queue_lock::async_queue_lock::AsyncQueueLock;
6use crate::queue_lock::queue_lock_builder::QueueLockBuilder;
7use crate::queue_state::queue_element::QueueElement;
8
9#[derive(Clone)]
10pub struct AsyncRedisQueue<T> {
11 queue_data_type: std::marker::PhantomData<T>,
12
13 queue_name: String,
14 redis_connection: MultiplexedConnection,
15
16 queue_lock_builder: QueueLockBuilder,
17}
18
19impl<T> AsyncRedisQueue<T>
20where T: Clone + Serialize + for<'de> Deserialize<'de> {
21 pub async fn new(queue_name: String, redis_client: redis::Client) -> AsyncRedisQueue<T> {
22 let queue_lock_builder = QueueLockBuilder::default()
23 .with_queue_name(queue_name.clone())
24 .with_redis_client(redis_client.clone());
25 AsyncRedisQueue {
26 queue_data_type: std::marker::PhantomData,
27 queue_name,
28 queue_lock_builder,
29 redis_connection: redis_client
30 .get_multiplexed_async_connection()
31 .await
32 .unwrap(),
33 }
34 }
35
36 pub async fn push(&mut self, item: T) {
37 self.get_lock()
38 .await
39 .lock(|| async move {
40 let element = QueueElement::new(item);
41
42 self.push_element(element.clone()).await;
43
44 if self.get_first_element_id().await.is_none() {
45 self.set_first_element_id(element.get_id()).await;
46 }
47
48 self.set_last_element_id(element.get_id()).await;
49 })
50 .await;
51 }
52
53 pub async fn pop(&mut self) -> Option<T> {
54 self.get_lock()
55 .await
56 .lock(|| async {
57 let option_element_id = self.get_first_element_id().await;
58 if option_element_id.is_none() {
59 return None;
60 }
61
62 let element_id = option_element_id.unwrap();
63
64 let option_element = self.get_element(element_id.clone()).await;
65 if option_element.is_none() {
66 return None;
67 }
68 let first_element = option_element.unwrap();
69
70 if first_element.get_next().is_none() {
71 self.unset_first_element_id().await;
72 self.unset_last_element_id().await;
73 } else {
74 let next_element_id = first_element.get_next().unwrap();
75 self.set_first_element_id(next_element_id).await;
76 }
77
78 self.delete_element(element_id.clone()).await;
79
80 Some(first_element.get_data())
81 })
82 .await
83 }
84
85 async fn get_element(&mut self, element_id: String) -> Option<QueueElement<T>> {
86 let element_key = format!("redis-queue:{}:element:{}", self.queue_name, element_id);
87 self.redis_connection
88 .get(element_key)
89 .await
90 .map_or(None, |data: String| serde_json::from_str(&data).ok())
91 }
92
93 async fn push_element(&mut self, element: QueueElement<T>) {
94 let element_key = format!(
95 "redis-queue:{}:element:{}",
96 self.queue_name,
97 element.get_id()
98 );
99 let element_data = serde_json::to_string(&element).unwrap();
100
101 self.redis_connection
102 .set::<String, String, String>(element_key, element_data)
103 .await
104 .unwrap();
105 }
106
107 async fn get_first_element_id(&mut self) -> Option<String> {
108 let first_element_key = format!("redis-queue:{}:state:first", self.queue_name);
109 self.redis_connection.get(first_element_key).await.ok()
110 }
111
112 async fn set_first_element_id(&mut self, element_id: String) {
113 let first_element_key = format!("redis-queue:{}:state:first", self.queue_name);
114 self.redis_connection
115 .set::<String, String, String>(first_element_key, element_id)
116 .await
117 .unwrap();
118 }
119
120 async fn unset_first_element_id(&mut self) {
121 let first_element_key = format!("redis-queue:{}:state:first", self.queue_name);
122 self.redis_connection
123 .del::<String, u8>(first_element_key)
124 .await
125 .unwrap();
126 }
127
128 async fn get_last_element_id(&mut self) -> Option<String> {
129 let last_element_key = format!("redis-queue:{}:state:last", self.queue_name);
130 self.redis_connection.get(last_element_key).await.ok()
131 }
132
133 async fn set_last_element_id(&mut self, element_id: String) {
134 let current_last_element_id = self.get_last_element_id().await;
135 if current_last_element_id.is_some() {
136 let current_last_element_id = current_last_element_id.unwrap();
137 let mut current_last_element = self
138 .get_element(current_last_element_id)
139 .await
140 .unwrap();
141 current_last_element.set_next(Some(element_id.clone()));
142 self.update_element(current_last_element).await;
143 }
144
145 let last_element_key = format!("redis-queue:{}:state:last", self.queue_name);
146 self.redis_connection
147 .set::<String, String, String>(last_element_key, element_id)
148 .await
149 .unwrap();
150 }
151
152 async fn unset_last_element_id(&mut self) {
153 let last_element_key = format!("redis-queue:{}:state:last", self.queue_name);
154 self.redis_connection
155 .del::<String, u8>(last_element_key)
156 .await
157 .unwrap();
158 }
159
160 async fn delete_element(&mut self, element_id: String) {
161 let element_key = format!("redis-queue:{}:element:{}", self.queue_name, element_id);
162 self.redis_connection
163 .del::<String, u8>(element_key)
164 .await
165 .unwrap();
166 }
167
168 async fn update_element(&mut self, element: QueueElement<T>) {
169 let element_key = format!(
170 "redis-queue:{}:element:{}",
171 self.queue_name,
172 element.get_id()
173 );
174 let element_data = serde_json::to_string(&element).unwrap();
175
176 self.redis_connection
177 .set::<String, String, String>(element_key, element_data)
178 .await
179 .unwrap();
180 }
181
182 async fn get_lock(&self) -> AsyncQueueLock {
183 self.queue_lock_builder.clone().async_build().await
184 }
185}