1pub mod async_redis_queue;
2mod queue_lock;
3mod queue_state;
4pub mod redis_queue;
5mod test_utils;
6
7#[cfg(test)]
8mod tests {
9 use crate::async_redis_queue::AsyncRedisQueue;
10 use crate::redis_queue::RedisQueue;
11 use crate::test_utils::init_redis::initialize_redis_client;
12
13 #[test]
14 fn initialize_redis_queue() {
15 let _: RedisQueue<String> = RedisQueue::new(
16 "initialize_redis_queue".to_string(),
17 initialize_redis_client(),
18 );
19 }
20
21 #[test]
22 fn test_push_pop_to_redis_queue() {
23 let mut redis_queue = RedisQueue::new(
24 "test_push_pop_to_redis_queue".to_string(),
25 initialize_redis_client(),
26 );
27
28 let item = "test".to_string();
29
30 redis_queue.push(item.clone());
31 let result = redis_queue.pop().unwrap();
32 assert_eq!(result, item);
33 }
34
35 #[test]
36 fn test_redis_queue_with_concurrent_push_pop() {
37 let redis_queue = RedisQueue::new(
38 "test_redis_queue_with_concurrent_push_pop".to_string(),
39 initialize_redis_client(),
40 );
41 let item = "test".to_string();
42
43 let num_tasks = 100;
44
45 let mut handles = vec![];
46 for _ in 0..num_tasks {
47 let mut queue = redis_queue.clone();
48 let local_item = item.clone();
49
50 let handle = std::thread::spawn(move || {
51 queue.push(local_item.clone());
52 });
53 handles.push(handle);
54 }
55
56 for handle in handles {
57 handle.join().unwrap();
58 }
59
60 let mut handles = vec![];
61 for _ in 0..num_tasks {
62 let result = item.clone();
63 let mut queue = redis_queue.clone();
64
65 let handle = std::thread::spawn(move || {
66 let value = queue.pop();
67 assert_eq!(value.unwrap(), result);
68 });
69 handles.push(handle);
70 }
71
72 for handle in handles {
73 handle.join().unwrap();
74 }
75 }
76
77 #[tokio::test]
78 async fn initialize_async_redis_queue() {
79 let _: AsyncRedisQueue<String> = AsyncRedisQueue::new(
80 "initialize_async_redis_queue".to_string(),
81 initialize_redis_client(),
82 )
83 .await;
84 }
85
86 #[tokio::test]
87 async fn test_async_push_pop_to_redis_queue() {
88 let mut redis_queue = AsyncRedisQueue::new(
89 "test_async_push_pop_to_redis_queue".to_string(),
90 initialize_redis_client(),
91 )
92 .await;
93
94 let item = "test".to_string();
95
96 redis_queue.push(item.clone()).await;
97 let result = redis_queue.pop().await.unwrap();
98 assert_eq!(result, item);
99 }
100
101 #[tokio::test]
102 async fn test_async_redis_queue_with_concurrent_push_pop() {
103 let redis_queue = AsyncRedisQueue::new(
104 "test_async_redis_queue_with_concurrent_push_pop".to_string(),
105 initialize_redis_client(),
106 )
107 .await;
108 let item = "test".to_string();
109
110 let num_tasks = 100;
111
112 let mut handles = vec![];
113 for _ in 0..num_tasks {
114 let mut queue = redis_queue.clone();
115 let local_item = item.clone();
116
117 let handle = tokio::spawn(async move {
118 queue.push(local_item.clone()).await;
119 });
120 handles.push(handle);
121 }
122
123 for handle in handles {
124 handle.await.unwrap();
125 }
126
127 let mut handles = vec![];
128 for _ in 0..num_tasks {
129 let result = item.clone();
130 let mut queue = redis_queue.clone();
131
132 let handle = tokio::spawn(async move {
133 let value = queue.pop().await;
134 assert_eq!(value.unwrap(), result);
135 });
136 handles.push(handle);
137 }
138
139 for handle in handles {
140 handle.await.unwrap();
141 }
142 }
143}