qm_redis/
work_queue.rs

1use std::future::Future;
2use std::time::Duration;
3
4use deadpool_redis::redis::{self, AsyncCommands, RedisResult, Value};
5use serde::{Deserialize, Serialize};
6use uuid::Uuid;
7
8#[derive(Clone, PartialEq, Eq, Debug)]
9pub struct KeyPrefix {
10    prefix: String,
11}
12
13impl KeyPrefix {
14    pub fn new(prefix: String) -> KeyPrefix {
15        KeyPrefix { prefix }
16    }
17
18    pub fn of(&self, name: &str) -> String {
19        let mut key = String::with_capacity(self.prefix.len() + name.len());
20        key.push_str(&self.prefix);
21        key.push_str(name);
22        key
23    }
24
25    pub fn and(&self, other: &str) -> KeyPrefix {
26        KeyPrefix::new(self.of(other))
27    }
28
29    pub fn concat(mut self, other: &str) -> KeyPrefix {
30        self.prefix.push_str(other);
31        self
32    }
33}
34
35impl From<String> for KeyPrefix {
36    fn from(prefix: String) -> KeyPrefix {
37        KeyPrefix::new(prefix)
38    }
39}
40
41impl From<&str> for KeyPrefix {
42    fn from(prefix: &str) -> KeyPrefix {
43        KeyPrefix::new(prefix.to_string())
44    }
45}
46
47impl From<KeyPrefix> for String {
48    fn from(key_prefix: KeyPrefix) -> String {
49        key_prefix.prefix
50    }
51}
52
53impl AsRef<str> for KeyPrefix {
54    fn as_ref(&self) -> &str {
55        &self.prefix
56    }
57}
58
59#[derive(Clone, Debug)]
60pub struct Item {
61    pub id: String,
62    pub data: Box<[u8]>,
63}
64
65impl Item {
66    pub fn new(data: Box<[u8]>) -> Item {
67        Item {
68            data,
69            id: Uuid::new_v4().to_string(),
70        }
71    }
72
73    pub fn from_string_data(data: String) -> Item {
74        Item::new(data.into_bytes().into_boxed_slice())
75    }
76
77    pub fn from_json_data<T: Serialize>(data: &T) -> serde_json::Result<Item> {
78        Ok(Item::new(serde_json::to_vec(data)?.into()))
79    }
80
81    pub fn data_json<'a, T: Deserialize<'a>>(&'a self) -> serde_json::Result<T> {
82        serde_json::from_slice(&self.data)
83    }
84
85    pub fn data_json_static<T: for<'de> Deserialize<'de>>(&self) -> serde_json::Result<T> {
86        serde_json::from_slice(&self.data)
87    }
88}
89
90pub struct WorkQueue {
91    session: String,
92    main_queue_key: String,
93    processing_key: String,
94    lease_key: KeyPrefix,
95    item_data_key: KeyPrefix,
96}
97
98impl WorkQueue {
99    pub fn new(name: KeyPrefix) -> WorkQueue {
100        WorkQueue {
101            session: Uuid::new_v4().to_string(),
102            main_queue_key: name.of(":queue"),
103            processing_key: name.of(":processing"),
104            lease_key: name.and(":leased_by_session:"),
105            item_data_key: name.and(":item:"),
106        }
107    }
108
109    pub async fn recover<C: AsyncCommands>(&self, db: &mut C) -> RedisResult<()> {
110        let processing: RedisResult<Value> = db.lrange(&self.processing_key, 0, -1).await;
111        let mut pipeline = Box::new(redis::pipe());
112        if let Ok(Value::Array(processing)) = processing {
113            for v in processing {
114                if let Value::SimpleString(item_id) = v {
115                    let a: bool = db.exists(self.lease_key.of(&item_id)).await?;
116                    let b: bool = db.exists(self.item_data_key.of(&item_id)).await?;
117                    if !a && b {
118                        tracing::info!("requeue '{}' -> item '{item_id}'", self.processing_key);
119                        pipeline.lpush(&self.main_queue_key, &item_id);
120                    }
121                }
122            }
123        }
124        pipeline.query_async(db).await
125    }
126
127    pub fn add_item_to_pipeline(&self, pipeline: &mut redis::Pipeline, item: &Item) {
128        pipeline.set(self.item_data_key.of(&item.id), item.data.as_ref());
129        pipeline.lpush(&self.main_queue_key, &item.id);
130    }
131
132    pub async fn add_item<C: AsyncCommands>(&self, db: &mut C, item: &Item) -> RedisResult<()> {
133        let mut pipeline = Box::new(redis::pipe());
134        self.add_item_to_pipeline(&mut pipeline, item);
135        pipeline.query_async(db).await
136    }
137
138    pub fn queue_len<'a, C: AsyncCommands>(
139        &'a self,
140        db: &'a mut C,
141    ) -> impl Future<Output = RedisResult<usize>> + 'a {
142        db.llen(&self.main_queue_key)
143    }
144
145    pub fn processing<'a, C: AsyncCommands>(
146        &'a self,
147        db: &'a mut C,
148    ) -> impl Future<Output = RedisResult<usize>> + 'a {
149        db.llen(&self.processing_key)
150    }
151
152    pub async fn lease<C: AsyncCommands>(
153        &self,
154        db: &mut C,
155        timeout: Option<Duration>,
156        lease_duration: Duration,
157    ) -> RedisResult<Option<Item>> {
158        let item_id: Option<String> = match timeout {
159            Some(Duration::ZERO) => {
160                db.lmove(
161                    &self.main_queue_key,
162                    &self.processing_key,
163                    redis::Direction::Right,
164                    redis::Direction::Left,
165                )
166                .await?
167            }
168            _ => {
169                db.blmove(
170                    &self.main_queue_key,
171                    &self.processing_key,
172                    redis::Direction::Right,
173                    redis::Direction::Left,
174                    timeout.map(|d| d.as_secs() as f64).unwrap_or(0f64),
175                )
176                .await?
177            }
178        };
179
180        let item = match item_id {
181            Some(item_id) => Item {
182                data: db
183                    .get::<_, Vec<u8>>(self.item_data_key.of(&item_id))
184                    .await?
185                    .into_boxed_slice(),
186                id: item_id,
187            },
188            None => return Ok(None),
189        };
190
191        let _: () = db
192            .set_ex(
193                self.lease_key.of(&item.id),
194                &self.session,
195                lease_duration.as_secs(),
196            )
197            .await?;
198
199        Ok(Some(item))
200    }
201
202    pub async fn complete<C: AsyncCommands>(&self, db: &mut C, item: &Item) -> RedisResult<bool> {
203        let removed: usize = db.lrem(&self.processing_key, 0, &item.id).await?;
204        if removed == 0 {
205            return Ok(false);
206        }
207        let _: () = redis::pipe()
208            .del(self.item_data_key.of(&item.id))
209            .del(self.lease_key.of(&item.id))
210            .query_async(db)
211            .await?;
212        Ok(true)
213    }
214}