1use std::sync::{Arc, Mutex};
31use uuid::Uuid;
32
33#[derive(Debug, Clone)]
38pub struct MessagePool {
39 inner: Arc<Mutex<Vec<crate::Message>>>,
40 max_size: usize,
41}
42
43impl MessagePool {
44 pub fn new() -> Self {
46 Self::with_capacity(1000)
47 }
48
49 pub fn with_capacity(max_size: usize) -> Self {
51 Self {
52 inner: Arc::new(Mutex::new(Vec::new())),
53 max_size,
54 }
55 }
56
57 pub fn acquire(&self) -> PooledMessage {
62 let msg = {
63 let mut pool = self.inner.lock().expect("lock should not be poisoned");
64 pool.pop()
65 };
66
67 let mut msg =
68 msg.unwrap_or_else(|| crate::Message::new("".to_string(), Uuid::nil(), Vec::new()));
69
70 msg.headers.task.clear();
72 msg.headers.id = Uuid::nil();
73 msg.headers.lang = crate::DEFAULT_LANG.to_string();
74 msg.headers.root_id = None;
75 msg.headers.parent_id = None;
76 msg.headers.group = None;
77 msg.headers.retries = None;
78 msg.headers.eta = None;
79 msg.headers.expires = None;
80 msg.headers.extra.clear();
81 msg.properties = crate::MessageProperties::default();
82 msg.body.clear();
83 msg.content_type = crate::CONTENT_TYPE_JSON.to_string();
84 msg.content_encoding = crate::ENCODING_UTF8.to_string();
85
86 PooledMessage {
87 message: Some(msg),
88 pool: self.clone(),
89 }
90 }
91
92 fn release(&self, msg: crate::Message) {
94 let mut pool = self.inner.lock().expect("lock should not be poisoned");
95 if pool.len() < self.max_size {
96 pool.push(msg);
97 }
98 }
100
101 #[inline]
103 pub fn size(&self) -> usize {
104 self.inner
105 .lock()
106 .expect("lock should not be poisoned")
107 .len()
108 }
109
110 #[inline]
112 pub fn max_size(&self) -> usize {
113 self.max_size
114 }
115
116 pub fn clear(&self) {
118 self.inner
119 .lock()
120 .expect("lock should not be poisoned")
121 .clear();
122 }
123}
124
125impl Default for MessagePool {
126 fn default() -> Self {
127 Self::new()
128 }
129}
130
131pub struct PooledMessage {
135 message: Option<crate::Message>,
136 pool: MessagePool,
137}
138
139impl PooledMessage {
140 pub fn take(mut self) -> crate::Message {
142 self.message
143 .take()
144 .expect("pooled message should always contain a message")
145 }
146
147 #[inline]
149 pub fn get(&self) -> &crate::Message {
150 self.message
151 .as_ref()
152 .expect("pooled message should always contain a message")
153 }
154
155 #[inline]
157 pub fn get_mut(&mut self) -> &mut crate::Message {
158 self.message
159 .as_mut()
160 .expect("pooled message should always contain a message")
161 }
162}
163
164impl std::ops::Deref for PooledMessage {
165 type Target = crate::Message;
166
167 fn deref(&self) -> &Self::Target {
168 self.message
169 .as_ref()
170 .expect("pooled message should always contain a message")
171 }
172}
173
174impl std::ops::DerefMut for PooledMessage {
175 fn deref_mut(&mut self) -> &mut Self::Target {
176 self.message
177 .as_mut()
178 .expect("pooled message should always contain a message")
179 }
180}
181
182impl Drop for PooledMessage {
183 fn drop(&mut self) {
184 if let Some(msg) = self.message.take() {
185 self.pool.release(msg);
186 }
187 }
188}
189
190#[derive(Debug, Clone)]
192pub struct TaskArgsPool {
193 inner: Arc<Mutex<Vec<crate::TaskArgs>>>,
194 max_size: usize,
195}
196
197impl TaskArgsPool {
198 pub fn new() -> Self {
200 Self::with_capacity(1000)
201 }
202
203 pub fn with_capacity(max_size: usize) -> Self {
205 Self {
206 inner: Arc::new(Mutex::new(Vec::new())),
207 max_size,
208 }
209 }
210
211 pub fn acquire(&self) -> PooledTaskArgs {
213 let args = {
214 let mut pool = self.inner.lock().expect("lock should not be poisoned");
215 pool.pop()
216 };
217
218 let mut args = args.unwrap_or_else(crate::TaskArgs::new);
219
220 args.args.clear();
222 args.kwargs.clear();
223
224 PooledTaskArgs {
225 args: Some(args),
226 pool: self.clone(),
227 }
228 }
229
230 fn release(&self, args: crate::TaskArgs) {
232 let mut pool = self.inner.lock().expect("lock should not be poisoned");
233 if pool.len() < self.max_size {
234 pool.push(args);
235 }
236 }
237
238 #[inline]
240 pub fn size(&self) -> usize {
241 self.inner
242 .lock()
243 .expect("lock should not be poisoned")
244 .len()
245 }
246
247 #[inline]
249 pub fn max_size(&self) -> usize {
250 self.max_size
251 }
252
253 pub fn clear(&self) {
255 self.inner
256 .lock()
257 .expect("lock should not be poisoned")
258 .clear();
259 }
260}
261
262impl Default for TaskArgsPool {
263 fn default() -> Self {
264 Self::new()
265 }
266}
267
268pub struct PooledTaskArgs {
270 args: Option<crate::TaskArgs>,
271 pool: TaskArgsPool,
272}
273
274impl PooledTaskArgs {
275 pub fn take(mut self) -> crate::TaskArgs {
277 self.args
278 .take()
279 .expect("pooled task args should always contain args")
280 }
281
282 #[inline]
284 pub fn get(&self) -> &crate::TaskArgs {
285 self.args
286 .as_ref()
287 .expect("pooled task args should always contain args")
288 }
289
290 #[inline]
292 pub fn get_mut(&mut self) -> &mut crate::TaskArgs {
293 self.args
294 .as_mut()
295 .expect("pooled task args should always contain args")
296 }
297}
298
299impl std::ops::Deref for PooledTaskArgs {
300 type Target = crate::TaskArgs;
301
302 fn deref(&self) -> &Self::Target {
303 self.args
304 .as_ref()
305 .expect("pooled task args should always contain args")
306 }
307}
308
309impl std::ops::DerefMut for PooledTaskArgs {
310 fn deref_mut(&mut self) -> &mut Self::Target {
311 self.args
312 .as_mut()
313 .expect("pooled task args should always contain args")
314 }
315}
316
317impl Drop for PooledTaskArgs {
318 fn drop(&mut self) {
319 if let Some(args) = self.args.take() {
320 self.pool.release(args);
321 }
322 }
323}
324
325#[cfg(test)]
326mod tests {
327 use super::*;
328
329 #[test]
330 fn test_message_pool_basic() {
331 let pool = MessagePool::new();
332 assert_eq!(pool.size(), 0);
333
334 let msg = pool.acquire();
335 assert_eq!(pool.size(), 0); drop(msg);
338 assert_eq!(pool.size(), 1); }
340
341 #[test]
342 fn test_message_pool_reuse() {
343 let pool = MessagePool::new();
344
345 let mut msg1 = pool.acquire();
347 msg1.headers.task = "test".to_string();
348 drop(msg1);
349
350 let msg2 = pool.acquire();
352 assert_eq!(msg2.headers.task, ""); assert_eq!(pool.size(), 0);
354 }
355
356 #[test]
357 fn test_message_pool_max_size() {
358 let pool = MessagePool::with_capacity(2);
359
360 let msg1 = pool.acquire();
361 let msg2 = pool.acquire();
362 let msg3 = pool.acquire();
363
364 drop(msg1);
365 drop(msg2);
366 drop(msg3);
367
368 assert_eq!(pool.size(), 2);
370 }
371
372 #[test]
373 fn test_pooled_message_deref() {
374 let pool = MessagePool::new();
375 let mut msg = pool.acquire();
376
377 msg.headers.task = "tasks.test".to_string();
378 assert_eq!(msg.headers.task, "tasks.test");
379 }
380
381 #[test]
382 fn test_pooled_message_take() {
383 let pool = MessagePool::new();
384 let mut msg = pool.acquire();
385 msg.headers.task = "tasks.test".to_string();
386
387 let owned = msg.take();
388 assert_eq!(owned.headers.task, "tasks.test");
389
390 assert_eq!(pool.size(), 0);
392 }
393
394 #[test]
395 fn test_task_args_pool_basic() {
396 let pool = TaskArgsPool::new();
397 assert_eq!(pool.size(), 0);
398
399 let args = pool.acquire();
400 assert_eq!(pool.size(), 0);
401
402 drop(args);
403 assert_eq!(pool.size(), 1);
404 }
405
406 #[test]
407 fn test_task_args_pool_reuse() {
408 let pool = TaskArgsPool::new();
409
410 {
411 let mut args1 = pool.acquire();
412 args1.get_mut().args.push(serde_json::json!(42));
413 }
414
415 let args2 = pool.acquire();
416 assert_eq!(args2.get().args.len(), 0); }
418
419 #[test]
420 fn test_task_args_pool_deref() {
421 let pool = TaskArgsPool::new();
422 let mut args = pool.acquire();
423
424 args.get_mut().args.push(serde_json::json!(1));
425 assert_eq!(args.get().args.len(), 1);
426 }
427
428 #[test]
429 fn test_task_args_pool_take() {
430 let pool = TaskArgsPool::new();
431 let mut args = pool.acquire();
432 args.get_mut().args.push(serde_json::json!(1));
433
434 let owned = args.take();
435 assert_eq!(owned.args.len(), 1);
436
437 assert_eq!(pool.size(), 0);
438 }
439
440 #[test]
441 fn test_pool_clear() {
442 let pool = MessagePool::new();
443
444 let msg1 = pool.acquire();
445 let msg2 = pool.acquire();
446 drop(msg1);
447 drop(msg2);
448
449 assert_eq!(pool.size(), 2);
450
451 pool.clear();
452 assert_eq!(pool.size(), 0);
453 }
454}