redis_queue_rs/
redis_queue.rs

1use redis::Commands;
2use serde::{Deserialize, Serialize};
3
4use crate::queue_lock::queue_lock::QueueLock;
5use crate::queue_lock::queue_lock_builder::QueueLockBuilder;
6use crate::queue_state::queue_element::QueueElement;
7
8#[derive(Clone)]
9pub struct RedisQueue<T> {
10    queue_data_type: std::marker::PhantomData<T>,
11    
12    queue_name: String,
13    redis_client: redis::Client,
14
15    queue_lock_builder: QueueLockBuilder,
16}
17
18impl<T> RedisQueue<T> 
19where T: Clone + Serialize + for<'de> Deserialize<'de> {
20    pub fn new(queue_name: String, redis_client: redis::Client) -> RedisQueue<T> {
21        let queue_lock_builder = QueueLockBuilder::default()
22            .with_queue_name(queue_name.clone())
23            .with_redis_client(redis_client.clone());
24        RedisQueue {
25            queue_data_type: std::marker::PhantomData,
26            queue_name,
27            queue_lock_builder,
28            redis_client,
29        }
30    }
31
32    pub fn push(&mut self, item: T) {
33        self.get_lock().lock(move || {
34            let element = QueueElement::new(item);
35            self.push_element(element.clone());
36
37            if self.get_first_element_id().is_none() {
38                self.set_first_element_id(element.get_id());
39            }
40
41            self.set_last_element_id(element.get_id());
42        });
43    }
44
45    pub fn pop(&mut self) -> Option<T> {
46        self.get_lock().lock(|| {
47            let option_element_id = self.get_first_element_id();
48            if option_element_id.is_none() {
49                return None;
50            }
51
52            let element_id = option_element_id.unwrap();
53
54            let option_element = self.get_element(element_id.clone());
55            if option_element.is_none() {
56                return None;
57            }
58            let first_element = option_element.unwrap();
59
60            if first_element.get_next().is_none() {
61                self.unset_first_element_id();
62                self.unset_last_element_id();
63            } else {
64                let next_element_id = first_element.get_next().unwrap();
65                self.set_first_element_id(next_element_id);
66            }
67
68            self.delete_element(element_id.clone());
69
70            Some(first_element.get_data())
71        })
72    }
73
74    fn get_element(&mut self, element_id: String) -> Option<QueueElement<T>> {
75        let element_key = format!("redis-queue:{}:element:{}", self.queue_name, element_id);
76        self.redis_connection()
77            .get(element_key)
78            .map_or(None, |data: String| serde_json::from_str(&data).ok())
79    }
80
81    fn push_element(&mut self, element: QueueElement<T>) {
82        let element_key = format!(
83            "redis-queue:{}:element:{}",
84            self.queue_name,
85            element.get_id()
86        );
87        let element_data = serde_json::to_string(&element).unwrap();
88
89        self.redis_connection()
90            .set::<String, String, String>(element_key, element_data)
91            .unwrap();
92    }
93
94    fn get_first_element_id(&mut self) -> Option<String> {
95        let first_element_key = format!("redis-queue:{}:state:first", self.queue_name);
96        self.redis_connection().get(first_element_key).ok()
97    }
98
99    fn set_first_element_id(&mut self, element_id: String) {
100        let first_element_key = format!("redis-queue:{}:state:first", self.queue_name);
101        self.redis_connection()
102            .set::<String, String, String>(first_element_key, element_id)
103            .unwrap();
104    }
105
106    fn unset_first_element_id(&mut self) {
107        let first_element_key = format!("redis-queue:{}:state:first", self.queue_name);
108        self.redis_connection()
109            .del::<String, u8>(first_element_key)
110            .unwrap();
111    }
112
113    fn get_last_element_id(&mut self) -> Option<String> {
114        let last_element_key = format!("redis-queue:{}:state:last", self.queue_name);
115        self.redis_connection().get(last_element_key).ok()
116    }
117
118    fn set_last_element_id(&mut self, element_id: String) {
119        let current_last_element_id = self.get_last_element_id();
120        if current_last_element_id.is_some() {
121            let current_last_element_id = current_last_element_id.unwrap();
122            let mut current_last_element = self.get_element(current_last_element_id).unwrap();
123            current_last_element.set_next(Some(element_id.clone()));
124            self.update_element(current_last_element);
125        }
126
127        let last_element_key = format!("redis-queue:{}:state:last", self.queue_name);
128        self.redis_connection()
129            .set::<String, String, String>(last_element_key, element_id)
130            .unwrap();
131    }
132
133    fn unset_last_element_id(&mut self) {
134        let last_element_key = format!("redis-queue:{}:state:last", self.queue_name);
135        self.redis_connection()
136            .del::<String, u8>(last_element_key)
137            .unwrap();
138    }
139
140    fn delete_element(&mut self, element_id: String) {
141        let element_key = format!("redis-queue:{}:element:{}", self.queue_name, element_id);
142        self.redis_connection()
143            .del::<String, u8>(element_key)
144            .unwrap();
145    }
146
147    fn update_element(&mut self, element: QueueElement<T>) {
148        let element_key = format!(
149            "redis-queue:{}:element:{}",
150            self.queue_name,
151            element.get_id()
152        );
153        let element_data = serde_json::to_string(&element).unwrap();
154
155        self.redis_connection()
156            .set::<String, String, String>(element_key, element_data)
157            .unwrap();
158    }
159
160    fn get_lock(&self) -> QueueLock {
161        self.queue_lock_builder.clone().build()
162    }
163
164    fn redis_connection(&self) -> redis::Connection {
165        self.redis_client.get_connection().unwrap()
166    }
167}