1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
use std::{thread, time::Duration};
use redis::{Client, Commands, RedisResult};
pub struct JobQueue {
client: Client,
queue_key: String,
}
impl JobQueue {
/// Create a new JobQueue instance.
///
/// # Arguments
///
/// * `redis_url` - The URL to the Redis server (e.g., "redis://127.0.0.1/").
/// * `queue_key` - The key(name) in Redis that all data will be stored.
///
/// # Returns
///
/// Returns a `JobQueue` instance on success, or Redis error on failure.
pub fn new(redis_url: &str, queue_key: &str) -> RedisResult<Self> {
let client = Client::open(redis_url)?;
Ok(Self {
client,
queue_key: queue_key.to_string(),
})
}
/// Push a job into the queue
///
/// # Arguments
///
/// * `queue_value` - The value to push into the queue (e.g. JSON String)
///
/// # Returns
///
/// Return the current length of queue
pub fn push_job(&self, queue_value: &str) -> RedisResult<i64> {
let mut conn = self.client.get_connection()?;
let len: i64 = conn.lpush(&self.queue_key, queue_value)?;
Ok(len)
}
/// Pop a first element in the queue
///
/// # Arguments
///
/// * `timeout` - The timeout value in seconds for the BLPOP command (e.g. 0.0 for no limit).
///
/// # Returns
///
/// Return a tuple `(queue_key, value)` on success, wrapped in `RedisResult`.
pub fn pop_job(&self, timeout: f64) -> RedisResult<(String, String)> {
let mut con = self.client.get_connection()?;
let result: (String, String) = con.blpop(&self.queue_key, timeout)?;
Ok(result)
}
/// Reset the entire queue by deleting the queue.
///
/// # Returns
///
/// Return Ok(()) on success, or Redis error on failure.
pub fn del_queue(&self) -> RedisResult<()> {
let mut con = self.client.get_connection()?;
let _: () = con.del(&self.queue_key)?;
Ok(())
}
/// Listener to run the provided callback each time a job is received.
///
/// # Arguments
///
/// * `timeout` - The timeout value in seconds for the BLPOP command (e.g. 0.0 for no limit).
/// * `callback` - The callback function that takes a tuple `(queue_key, value)`.
///
/// # Returns
///
/// Return a `JoinHandle<()>` for the listener thread.
pub fn listen<F>(&self, timeout: f64, callback: F) -> thread::JoinHandle<()>
where
F: Fn((String, String)) + Send + 'static,
Client: Clone,
{
let client = self.client.clone();
let queue_key = self.queue_key.clone();
thread::spawn(move || loop {
// attempt to get a connection
let mut con = match client.get_connection() {
Ok(conn) => conn,
Err(err) => {
eprintln!("[Listener] Connection error: {}", err);
thread::sleep(Duration::from_secs(1));
continue;
}
};
// Execute BLPOP; the callback is called on receiving a new job.
match con.blpop(&queue_key, timeout) {
Ok(res) => callback(res),
Err(err) => {
eprintln!("[Listener] BLPOP Error: {}", err);
thread::sleep(Duration::from_secs(1));
}
}
})
}
}
#[cfg(test)]
mod tests {
use std::{sync::Arc, thread, time::Duration};
use super::JobQueue;
/// Test that a single job can be pushed and popped correctly.
#[test]
fn job_queue_test() {
let queue_key = "test_queue_test";
let queue_value = "test_queue_value";
let job_queue = JobQueue::new("redis://127.0.0.1/", queue_key).unwrap();
job_queue.del_queue().unwrap();
let len = job_queue.push_job(&queue_value).unwrap();
assert_eq!(len, 1);
let result = job_queue.pop_job(0.0).unwrap();
println!("{:?}", result);
assert_eq!(result.1, queue_value)
}
/// Test that multiple pushing are handled in a separate thread.
#[test]
fn job_queue_multi_push_test() {
let queue_key = "test_queue_multi_push";
let job_queue = Arc::new(JobQueue::new("redis://127.0.0.1/", queue_key).unwrap());
job_queue.del_queue().unwrap();
let consumer_queue = Arc::clone(&job_queue);
let consumer_handle = thread::spawn(move || {
for i in 0..5 {
match consumer_queue.pop_job(0.0) {
Ok(job) => {
println!("[Consumer] {}: Queue key - {}, value - {}", i, job.0, job.1)
}
Err(e) => eprintln!("{:?}", e),
}
}
});
for i in 0..5 {
let queue_value = format!("job_{}", i);
job_queue
.push_job(&queue_value)
.expect("[Producer] Failed to push a job");
println!("[Producer] Pushed {}", queue_value);
thread::sleep(Duration::from_secs(1));
}
consumer_handle.join().unwrap();
}
/// Test that the listener functionality.
#[test]
fn job_queue_listener_test() {
let queue_key = "test_queue_listener";
let job_queue = JobQueue::new("redis://127.0.0.1/", queue_key).unwrap();
let listen_handler = job_queue.listen(0.0, move |(q, value)| {
println!("[Listener] Queue: {}, Job: {}", q, value);
});
for i in 0..5 {
let new_job = format!("job_number_{}", i);
let num = job_queue.push_job(&new_job).unwrap();
println!("[Producer] Pushed: {}. Current length: {}", new_job, num);
thread::sleep(Duration::from_millis(500));
}
thread::sleep(Duration::from_secs(1));
// listen_handler.join();
}
}