1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
use std::{future::Future, time::Duration};
use redis::{AsyncCommands, Client, Commands};
use thiserror::Error;
use tokio::{task::JoinHandle, time::sleep};
/// JobQueue Error
#[derive(Debug, Error)]
pub enum JobQueueError {
#[error("Redis Error: {0}")]
RedisError(#[from] redis::RedisError),
}
pub struct JobQueue {
client: Client,
queue_key: String,
}
impl JobQueue {
/// Create a new JobQueue instance.
///
/// # Arguments
/// * `redis_url` - The URL to the Redis server (e.g., "redis://127.0.0.1/").
/// * `queue_key` - The key(name) in Redis that all data will be stored.
///
/// # Returns
/// * `Ok(JobQueue)`
/// * `Err(JobQueueError)` if an error occurs while communicating with Redis
pub fn new(redis_url: &str, queue_key: &str) -> Result<Self, JobQueueError> {
let client = Client::open(redis_url)?;
Ok(Self {
client,
queue_key: queue_key.to_string(),
})
}
/// Push a job into the queue
///
/// # Arguments
/// * `queue_value` - The value to push into the queue (e.g. JSON String)
///
/// # Returns
/// * `Ok(i64)` the current length of queue, representing the item place.
pub async fn push_job(&self, queue_value: &str) -> Result<i64, JobQueueError> {
let mut conn = self.client.get_multiplexed_async_connection().await?;
let len: i64 = conn.rpush(&self.queue_key, queue_value).await?;
Ok(len)
}
/// Pop a first element in the queue
///
/// # Arguments
/// * `timeout` - The timeout value in seconds for the BLPOP command (e.g. 0.0 for no limit).
///
/// # Returns
/// * `Ok((String, String))` on success, which represents (queue_key, value).
/// * `Err(JobQueueError)` on failure
pub async fn pop_job(&self, timeout: f64) -> Result<(String, String), JobQueueError> {
let mut con = self.client.get_multiplexed_async_connection().await?;
let result: (String, String) = con.blpop(&self.queue_key, timeout).await?;
Ok(result)
}
/// Reset the entire queue by deleting the queue.
///
/// # Returns
/// * `Ok(())` on success.
/// * `Err(JobQueueError)` if an error occurs while communicating with Redis.
pub async fn del_queue(&self) -> Result<(), JobQueueError> {
let mut con = self.client.get_multiplexed_async_connection().await?;
let _: () = con.del(&self.queue_key).await?;
Ok(())
}
/// Listener to run the provided callback each time a job is received.
///
/// # Arguments
///
/// * `timeout` - The timeout value in seconds for the BLPOP command (e.g. 0.0 for no limit).
/// * `callback` - The callback function that takes a tuple `(queue_key, value)`.
///
/// # Returns
/// * `JoinHandle<()>` the listener thread
pub fn listen<F, Fut>(&self, timeout: f64, callback: F) -> JoinHandle<()>
where
F: Fn((String, String)) -> Fut + Send + Sync + 'static,
Fut: Future<Output = ()> + Send + 'static,
{
let client = self.client.clone();
let queue_key = self.queue_key.clone();
tokio::spawn(async move {
loop {
// attempt to get a connection
let mut con = match client.get_multiplexed_async_connection().await {
Ok(conn) => conn,
Err(err) => {
eprintln!("[Listener] Connection error: {}", err);
sleep(Duration::from_secs(1)).await;
continue;
}
};
// Execute BLPOP; the callback is called on receiving a new job.
match con.blpop(&queue_key, timeout).await {
Ok(res) => callback(res).await,
Err(err) => {
eprintln!("[Listener] BLPOP Error: {}", err);
sleep(Duration::from_secs(1)).await;
}
}
}
})
}
/// Locate the index of the first item in the queue that satisfies the given filter.
///
/// The queue stores data as strings (e.g., JSON or any string representation), and the provided closure
/// is used to filter these items.
///
/// # Arguments
/// * `f` - A closure that takes a reference to an item (as &str) and returns `Some(_)` if the item meets the criteria,
/// or `None` otherwise.
///
/// # Returns
/// * `Ok(Some(index))` if an item that satisfies the filter is found.
/// * `Ok(None)` if no matching item is found.
/// * `Err(JobQueueError)` if an error occurs while communicating with Redis.
pub async fn locate_item<T, F>(&self, f: F) -> Result<Option<usize>, JobQueueError>
where
F: Fn(&str) -> Option<T>,
{
let mut conn = self.client.get_multiplexed_async_connection().await?;
let items: Vec<String> = conn.lrange(&self.queue_key, 0, -1).await?;
for (index, elm) in items.iter().enumerate() {
if let Some(_) = f(elm) {
return Ok(Some(index));
}
}
Ok(None)
}
}
#[cfg(test)]
mod tests {
use std::{sync::Arc, time::Duration};
use tokio::sync::mpsc;
use tokio::time::sleep;
use super::JobQueue;
/// Test that a single job can be pushed and popped correctly.
#[tokio::test]
async fn job_queue_test() {
let queue_key = "test_queue_test";
let queue_value = "test_queue_value";
let job_queue = JobQueue::new("redis://127.0.0.1/", queue_key).unwrap();
job_queue.del_queue().await.unwrap();
let len = job_queue.push_job(&queue_value).await.unwrap();
assert_eq!(len, 1);
let result = job_queue.pop_job(0.0).await.unwrap();
println!("{:?}", result);
assert_eq!(result.1, queue_value)
}
/// Test that multiple pushing are handled in a separate thread.
#[tokio::test]
async fn job_queue_multi_push_test() {
let queue_key = "test_queue_multi_push";
let job_queue = Arc::new(JobQueue::new("redis://127.0.0.1/", queue_key).unwrap());
job_queue.del_queue().await.unwrap();
let consumer_queue = Arc::clone(&job_queue);
let consumer_handle = tokio::spawn(async move {
for i in 0..5 {
match consumer_queue.pop_job(0.0).await {
Ok(job) => {
println!("[Consumer] {}: Queue key - {}, value - {}", i, job.0, job.1)
}
Err(e) => eprintln!("{:?}", e),
}
}
});
for i in 0..5 {
let queue_value = format!("job_{}", i);
job_queue
.push_job(&queue_value)
.await
.expect("[Producer] Failed to push a job");
println!("[Producer] Pushed {}", queue_value);
sleep(Duration::from_secs(1)).await;
}
consumer_handle.await.unwrap();
}
/// Test that the listener functionality.
#[tokio::test]
async fn job_queue_listener_test() {
let queue_key = "test_queue_listener";
let job_queue = JobQueue::new("redis://127.0.0.1/", queue_key).unwrap();
let (tx, mut rx) = mpsc::channel(5);
let listen_handler = job_queue.listen(0.0, move |(q, value)| {
let tx = tx.clone();
async move {
println!("[Listener] Queue: {}, Job: {}", q, value);
tx.send(()).await.unwrap();
}
});
for i in 0..5 {
let new_job = format!("job_number_{}", i);
let num = job_queue.push_job(&new_job).await.unwrap();
println!("[Producer] Pushed: {}. Current length: {}", new_job, num);
sleep(Duration::from_millis(500)).await;
}
for _ in 0..5 {
rx.recv().await;
}
listen_handler.abort();
println!("Test finished after processing 5 jobs.");
}
#[tokio::test]
async fn locate_item_test() {
let queue_key = "locate_item";
let queue_value = "locate_item_value";
let job_queue = JobQueue::new("redis://127.0.0.1/", queue_key).unwrap();
for i in 0..5 {
let v = format!("locate_item_value_{}", i);
let _ = job_queue.push_job(&v).await.unwrap();
}
let _: i64 = job_queue.push_job(&queue_value).await.unwrap();
let pos = job_queue
.locate_item(|item| if item == queue_value { Some(()) } else { None })
.await
.unwrap()
.unwrap();
job_queue.del_queue().await.unwrap();
assert_eq!(pos, 5)
}
}