1use std::future::Future;
2use std::time::Duration;
3
4use deadpool_redis::redis::{self, AsyncCommands, RedisResult, Value};
5use serde::{Deserialize, Serialize};
6use uuid::Uuid;
7
8#[derive(Clone, PartialEq, Eq, Debug)]
9pub struct KeyPrefix {
10 prefix: String,
11}
12
13impl KeyPrefix {
14 pub fn new(prefix: String) -> KeyPrefix {
15 KeyPrefix { prefix }
16 }
17
18 pub fn of(&self, name: &str) -> String {
19 let mut key = String::with_capacity(self.prefix.len() + name.len());
20 key.push_str(&self.prefix);
21 key.push_str(name);
22 key
23 }
24
25 pub fn and(&self, other: &str) -> KeyPrefix {
26 KeyPrefix::new(self.of(other))
27 }
28
29 pub fn concat(mut self, other: &str) -> KeyPrefix {
30 self.prefix.push_str(other);
31 self
32 }
33}
34
35impl From<String> for KeyPrefix {
36 fn from(prefix: String) -> KeyPrefix {
37 KeyPrefix::new(prefix)
38 }
39}
40
41impl From<&str> for KeyPrefix {
42 fn from(prefix: &str) -> KeyPrefix {
43 KeyPrefix::new(prefix.to_string())
44 }
45}
46
47impl From<KeyPrefix> for String {
48 fn from(key_prefix: KeyPrefix) -> String {
49 key_prefix.prefix
50 }
51}
52
53impl AsRef<str> for KeyPrefix {
54 fn as_ref(&self) -> &str {
55 &self.prefix
56 }
57}
58
59#[derive(Clone, Debug)]
60pub struct Item {
61 pub id: String,
62 pub data: Box<[u8]>,
63}
64
65impl Item {
66 pub fn new(data: Box<[u8]>) -> Item {
67 Item {
68 data,
69 id: Uuid::new_v4().to_string(),
70 }
71 }
72
73 pub fn from_string_data(data: String) -> Item {
74 Item::new(data.into_bytes().into_boxed_slice())
75 }
76
77 pub fn from_json_data<T: Serialize>(data: &T) -> serde_json::Result<Item> {
78 Ok(Item::new(serde_json::to_vec(data)?.into()))
79 }
80
81 pub fn data_json<'a, T: Deserialize<'a>>(&'a self) -> serde_json::Result<T> {
82 serde_json::from_slice(&self.data)
83 }
84
85 pub fn data_json_static<T: for<'de> Deserialize<'de>>(&self) -> serde_json::Result<T> {
86 serde_json::from_slice(&self.data)
87 }
88}
89
90pub struct WorkQueue {
91 session: String,
92 main_queue_key: String,
93 processing_key: String,
94 lease_key: KeyPrefix,
95 item_data_key: KeyPrefix,
96}
97
98impl WorkQueue {
99 pub fn new(name: KeyPrefix) -> WorkQueue {
100 WorkQueue {
101 session: Uuid::new_v4().to_string(),
102 main_queue_key: name.of(":queue"),
103 processing_key: name.of(":processing"),
104 lease_key: name.and(":leased_by_session:"),
105 item_data_key: name.and(":item:"),
106 }
107 }
108
109 pub async fn recover<C: AsyncCommands>(&self, db: &mut C) -> RedisResult<()> {
110 let processing: RedisResult<Value> = db.lrange(&self.processing_key, 0, -1).await;
111 let mut pipeline = Box::new(redis::pipe());
112 if let Ok(Value::Array(processing)) = processing {
113 for v in processing {
114 if let Value::SimpleString(item_id) = v {
115 let a: bool = db.exists(self.lease_key.of(&item_id)).await?;
116 let b: bool = db.exists(self.item_data_key.of(&item_id)).await?;
117 if !a && b {
118 tracing::info!("requeue '{}' -> item '{item_id}'", self.processing_key);
119 pipeline.lpush(&self.main_queue_key, &item_id);
120 }
121 }
122 }
123 }
124 pipeline.query_async(db).await
125 }
126
127 pub fn add_item_to_pipeline(&self, pipeline: &mut redis::Pipeline, item: &Item) {
128 pipeline.set(self.item_data_key.of(&item.id), item.data.as_ref());
129 pipeline.lpush(&self.main_queue_key, &item.id);
130 }
131
132 pub async fn add_item<C: AsyncCommands>(&self, db: &mut C, item: &Item) -> RedisResult<()> {
133 let mut pipeline = Box::new(redis::pipe());
134 self.add_item_to_pipeline(&mut pipeline, item);
135 pipeline.query_async(db).await
136 }
137
138 pub fn queue_len<'a, C: AsyncCommands>(
139 &'a self,
140 db: &'a mut C,
141 ) -> impl Future<Output = RedisResult<usize>> + 'a {
142 db.llen(&self.main_queue_key)
143 }
144
145 pub fn processing<'a, C: AsyncCommands>(
146 &'a self,
147 db: &'a mut C,
148 ) -> impl Future<Output = RedisResult<usize>> + 'a {
149 db.llen(&self.processing_key)
150 }
151
152 pub async fn lease<C: AsyncCommands>(
153 &self,
154 db: &mut C,
155 timeout: Option<Duration>,
156 lease_duration: Duration,
157 ) -> RedisResult<Option<Item>> {
158 let item_id: Option<String> = match timeout {
159 Some(Duration::ZERO) => {
160 db.lmove(
161 &self.main_queue_key,
162 &self.processing_key,
163 redis::Direction::Right,
164 redis::Direction::Left,
165 )
166 .await?
167 }
168 _ => {
169 db.blmove(
170 &self.main_queue_key,
171 &self.processing_key,
172 redis::Direction::Right,
173 redis::Direction::Left,
174 timeout.map(|d| d.as_secs() as f64).unwrap_or(0f64),
175 )
176 .await?
177 }
178 };
179
180 let item = match item_id {
181 Some(item_id) => Item {
182 data: db
183 .get::<_, Vec<u8>>(self.item_data_key.of(&item_id))
184 .await?
185 .into_boxed_slice(),
186 id: item_id,
187 },
188 None => return Ok(None),
189 };
190
191 let _: () = db
192 .set_ex(
193 self.lease_key.of(&item.id),
194 &self.session,
195 lease_duration.as_secs(),
196 )
197 .await?;
198
199 Ok(Some(item))
200 }
201
202 pub async fn complete<C: AsyncCommands>(&self, db: &mut C, item: &Item) -> RedisResult<bool> {
203 let removed: usize = db.lrem(&self.processing_key, 0, &item.id).await?;
204 if removed == 0 {
205 return Ok(false);
206 }
207 let _: () = redis::pipe()
208 .del(self.item_data_key.of(&item.id))
209 .del(self.lease_key.of(&item.id))
210 .query_async(db)
211 .await?;
212 Ok(true)
213 }
214}