redis_queue_rs/
lib.rs

1pub mod async_redis_queue;
2mod queue_lock;
3mod queue_state;
4pub mod redis_queue;
5mod test_utils;
6
7#[cfg(test)]
8mod tests {
9    use crate::async_redis_queue::AsyncRedisQueue;
10    use crate::redis_queue::RedisQueue;
11    use crate::test_utils::init_redis::initialize_redis_client;
12
13    #[test]
14    fn initialize_redis_queue() {
15        let _: RedisQueue<String> = RedisQueue::new(
16            "initialize_redis_queue".to_string(),
17            initialize_redis_client(),
18        );
19    }
20
21    #[test]
22    fn test_push_pop_to_redis_queue() {
23        let mut redis_queue = RedisQueue::new(
24            "test_push_pop_to_redis_queue".to_string(),
25            initialize_redis_client(),
26        );
27
28        let item = "test".to_string();
29
30        redis_queue.push(item.clone());
31        let result = redis_queue.pop().unwrap();
32        assert_eq!(result, item);
33    }
34
35    #[test]
36    fn test_redis_queue_with_concurrent_push_pop() {
37        let redis_queue = RedisQueue::new(
38            "test_redis_queue_with_concurrent_push_pop".to_string(),
39            initialize_redis_client(),
40        );
41        let item = "test".to_string();
42        
43        let num_tasks = 100;
44
45        let mut handles = vec![];
46        for _ in 0..num_tasks {
47            let mut queue = redis_queue.clone();
48            let local_item = item.clone();
49
50            let handle = std::thread::spawn(move || {
51                queue.push(local_item.clone());
52            });
53            handles.push(handle);
54        }
55
56        for handle in handles {
57            handle.join().unwrap();
58        }
59
60        let mut handles = vec![];
61        for _ in 0..num_tasks {
62            let result = item.clone();
63            let mut queue = redis_queue.clone();
64
65            let handle = std::thread::spawn(move || {
66                let value = queue.pop();
67                assert_eq!(value.unwrap(), result);
68            });
69            handles.push(handle);
70        }
71
72        for handle in handles {
73            handle.join().unwrap();
74        }
75    }
76
77    #[tokio::test]
78    async fn initialize_async_redis_queue() {
79        let _: AsyncRedisQueue<String> = AsyncRedisQueue::new(
80            "initialize_async_redis_queue".to_string(),
81            initialize_redis_client(),
82        )
83        .await;
84    }
85
86    #[tokio::test]
87    async fn test_async_push_pop_to_redis_queue() {
88        let mut redis_queue = AsyncRedisQueue::new(
89            "test_async_push_pop_to_redis_queue".to_string(),
90            initialize_redis_client(),
91        )
92        .await;
93
94        let item = "test".to_string();
95
96        redis_queue.push(item.clone()).await;
97        let result = redis_queue.pop().await.unwrap();
98        assert_eq!(result, item);
99    }
100
101    #[tokio::test]
102    async fn test_async_redis_queue_with_concurrent_push_pop() {
103        let redis_queue = AsyncRedisQueue::new(
104            "test_async_redis_queue_with_concurrent_push_pop".to_string(),
105            initialize_redis_client(),
106        )
107        .await;
108        let item = "test".to_string();
109        
110        let num_tasks = 100;
111
112        let mut handles = vec![];
113        for _ in 0..num_tasks {
114            let mut queue = redis_queue.clone();
115            let local_item = item.clone();
116
117            let handle = tokio::spawn(async move {
118                queue.push(local_item.clone()).await;
119            });
120            handles.push(handle);
121        }
122
123        for handle in handles {
124            handle.await.unwrap();
125        }
126
127        let mut handles = vec![];
128        for _ in 0..num_tasks   {
129            let result = item.clone();
130            let mut queue = redis_queue.clone();
131
132            let handle = tokio::spawn(async move {
133                let value = queue.pop().await;
134                assert_eq!(value.unwrap(), result);
135            });
136            handles.push(handle);
137        }
138
139        for handle in handles {
140            handle.await.unwrap();
141        }
142    }
143}