redis_queue_rs/
redis_queue.rs1use redis::Commands;
2use serde::{Deserialize, Serialize};
3
4use crate::queue_lock::queue_lock::QueueLock;
5use crate::queue_lock::queue_lock_builder::QueueLockBuilder;
6use crate::queue_state::queue_element::QueueElement;
7
8#[derive(Clone)]
9pub struct RedisQueue<T> {
10 queue_data_type: std::marker::PhantomData<T>,
11
12 queue_name: String,
13 redis_client: redis::Client,
14
15 queue_lock_builder: QueueLockBuilder,
16}
17
18impl<T> RedisQueue<T>
19where T: Clone + Serialize + for<'de> Deserialize<'de> {
20 pub fn new(queue_name: String, redis_client: redis::Client) -> RedisQueue<T> {
21 let queue_lock_builder = QueueLockBuilder::default()
22 .with_queue_name(queue_name.clone())
23 .with_redis_client(redis_client.clone());
24 RedisQueue {
25 queue_data_type: std::marker::PhantomData,
26 queue_name,
27 queue_lock_builder,
28 redis_client,
29 }
30 }
31
32 pub fn push(&mut self, item: T) {
33 self.get_lock().lock(move || {
34 let element = QueueElement::new(item);
35 self.push_element(element.clone());
36
37 if self.get_first_element_id().is_none() {
38 self.set_first_element_id(element.get_id());
39 }
40
41 self.set_last_element_id(element.get_id());
42 });
43 }
44
45 pub fn pop(&mut self) -> Option<T> {
46 self.get_lock().lock(|| {
47 let option_element_id = self.get_first_element_id();
48 if option_element_id.is_none() {
49 return None;
50 }
51
52 let element_id = option_element_id.unwrap();
53
54 let option_element = self.get_element(element_id.clone());
55 if option_element.is_none() {
56 return None;
57 }
58 let first_element = option_element.unwrap();
59
60 if first_element.get_next().is_none() {
61 self.unset_first_element_id();
62 self.unset_last_element_id();
63 } else {
64 let next_element_id = first_element.get_next().unwrap();
65 self.set_first_element_id(next_element_id);
66 }
67
68 self.delete_element(element_id.clone());
69
70 Some(first_element.get_data())
71 })
72 }
73
74 fn get_element(&mut self, element_id: String) -> Option<QueueElement<T>> {
75 let element_key = format!("redis-queue:{}:element:{}", self.queue_name, element_id);
76 self.redis_connection()
77 .get(element_key)
78 .map_or(None, |data: String| serde_json::from_str(&data).ok())
79 }
80
81 fn push_element(&mut self, element: QueueElement<T>) {
82 let element_key = format!(
83 "redis-queue:{}:element:{}",
84 self.queue_name,
85 element.get_id()
86 );
87 let element_data = serde_json::to_string(&element).unwrap();
88
89 self.redis_connection()
90 .set::<String, String, String>(element_key, element_data)
91 .unwrap();
92 }
93
94 fn get_first_element_id(&mut self) -> Option<String> {
95 let first_element_key = format!("redis-queue:{}:state:first", self.queue_name);
96 self.redis_connection().get(first_element_key).ok()
97 }
98
99 fn set_first_element_id(&mut self, element_id: String) {
100 let first_element_key = format!("redis-queue:{}:state:first", self.queue_name);
101 self.redis_connection()
102 .set::<String, String, String>(first_element_key, element_id)
103 .unwrap();
104 }
105
106 fn unset_first_element_id(&mut self) {
107 let first_element_key = format!("redis-queue:{}:state:first", self.queue_name);
108 self.redis_connection()
109 .del::<String, u8>(first_element_key)
110 .unwrap();
111 }
112
113 fn get_last_element_id(&mut self) -> Option<String> {
114 let last_element_key = format!("redis-queue:{}:state:last", self.queue_name);
115 self.redis_connection().get(last_element_key).ok()
116 }
117
118 fn set_last_element_id(&mut self, element_id: String) {
119 let current_last_element_id = self.get_last_element_id();
120 if current_last_element_id.is_some() {
121 let current_last_element_id = current_last_element_id.unwrap();
122 let mut current_last_element = self.get_element(current_last_element_id).unwrap();
123 current_last_element.set_next(Some(element_id.clone()));
124 self.update_element(current_last_element);
125 }
126
127 let last_element_key = format!("redis-queue:{}:state:last", self.queue_name);
128 self.redis_connection()
129 .set::<String, String, String>(last_element_key, element_id)
130 .unwrap();
131 }
132
133 fn unset_last_element_id(&mut self) {
134 let last_element_key = format!("redis-queue:{}:state:last", self.queue_name);
135 self.redis_connection()
136 .del::<String, u8>(last_element_key)
137 .unwrap();
138 }
139
140 fn delete_element(&mut self, element_id: String) {
141 let element_key = format!("redis-queue:{}:element:{}", self.queue_name, element_id);
142 self.redis_connection()
143 .del::<String, u8>(element_key)
144 .unwrap();
145 }
146
147 fn update_element(&mut self, element: QueueElement<T>) {
148 let element_key = format!(
149 "redis-queue:{}:element:{}",
150 self.queue_name,
151 element.get_id()
152 );
153 let element_data = serde_json::to_string(&element).unwrap();
154
155 self.redis_connection()
156 .set::<String, String, String>(element_key, element_data)
157 .unwrap();
158 }
159
160 fn get_lock(&self) -> QueueLock {
161 self.queue_lock_builder.clone().build()
162 }
163
164 fn redis_connection(&self) -> redis::Connection {
165 self.redis_client.get_connection().unwrap()
166 }
167}