celers_protocol/
pool.rs

1//! Message pooling for memory efficiency
2//!
3//! This module provides object pooling for messages to reduce allocation overhead
4//! in high-throughput scenarios. By reusing message structures, we can significantly
5//! reduce GC pressure and improve performance.
6//!
7//! # Examples
8//!
9//! ```
10//! use celers_protocol::pool::{MessagePool, PooledMessage};
11//! use uuid::Uuid;
12//!
13//! // Create a message pool
14//! let pool = MessagePool::new();
15//!
16//! // Acquire a pooled message
17//! let mut msg = pool.acquire();
18//! msg.headers.task = "tasks.add".to_string();
19//! msg.headers.id = Uuid::new_v4();
20//! msg.body = vec![1, 2, 3];
21//!
22//! // When dropped, the message is returned to the pool
23//! drop(msg);
24//!
25//! // The next acquire will reuse the same allocation
26//! let msg2 = pool.acquire();
27//! assert_eq!(pool.size(), 0); // Pool is now empty
28//! ```
29
30use std::sync::{Arc, Mutex};
31use uuid::Uuid;
32
33/// A pool of reusable messages
34///
35/// Uses a simple stack-based pool with thread-safe access.
36/// Messages are automatically returned to the pool when dropped.
37#[derive(Debug, Clone)]
38pub struct MessagePool {
39    inner: Arc<Mutex<Vec<crate::Message>>>,
40    max_size: usize,
41}
42
43impl MessagePool {
44    /// Create a new message pool with default capacity (1000)
45    pub fn new() -> Self {
46        Self::with_capacity(1000)
47    }
48
49    /// Create a new message pool with specified maximum capacity
50    pub fn with_capacity(max_size: usize) -> Self {
51        Self {
52            inner: Arc::new(Mutex::new(Vec::new())),
53            max_size,
54        }
55    }
56
57    /// Acquire a message from the pool
58    ///
59    /// If the pool is empty, a new message is allocated.
60    /// Otherwise, a recycled message is returned (after clearing).
61    pub fn acquire(&self) -> PooledMessage {
62        let msg = {
63            let mut pool = self.inner.lock().expect("lock should not be poisoned");
64            pool.pop()
65        };
66
67        let mut msg =
68            msg.unwrap_or_else(|| crate::Message::new("".to_string(), Uuid::nil(), Vec::new()));
69
70        // Clear the message for reuse
71        msg.headers.task.clear();
72        msg.headers.id = Uuid::nil();
73        msg.headers.lang = crate::DEFAULT_LANG.to_string();
74        msg.headers.root_id = None;
75        msg.headers.parent_id = None;
76        msg.headers.group = None;
77        msg.headers.retries = None;
78        msg.headers.eta = None;
79        msg.headers.expires = None;
80        msg.headers.extra.clear();
81        msg.properties = crate::MessageProperties::default();
82        msg.body.clear();
83        msg.content_type = crate::CONTENT_TYPE_JSON.to_string();
84        msg.content_encoding = crate::ENCODING_UTF8.to_string();
85
86        PooledMessage {
87            message: Some(msg),
88            pool: self.clone(),
89        }
90    }
91
92    /// Return a message to the pool
93    fn release(&self, msg: crate::Message) {
94        let mut pool = self.inner.lock().expect("lock should not be poisoned");
95        if pool.len() < self.max_size {
96            pool.push(msg);
97        }
98        // Otherwise, drop the message (pool is full)
99    }
100
101    /// Get the current number of messages in the pool
102    #[inline]
103    pub fn size(&self) -> usize {
104        self.inner
105            .lock()
106            .expect("lock should not be poisoned")
107            .len()
108    }
109
110    /// Get the maximum pool size
111    #[inline]
112    pub fn max_size(&self) -> usize {
113        self.max_size
114    }
115
116    /// Clear all messages from the pool
117    pub fn clear(&self) {
118        self.inner
119            .lock()
120            .expect("lock should not be poisoned")
121            .clear();
122    }
123}
124
125impl Default for MessagePool {
126    fn default() -> Self {
127        Self::new()
128    }
129}
130
131/// A pooled message that automatically returns to the pool when dropped
132///
133/// Provides deref access to the underlying message.
134pub struct PooledMessage {
135    message: Option<crate::Message>,
136    pool: MessagePool,
137}
138
139impl PooledMessage {
140    /// Take ownership of the message, removing it from pool management
141    pub fn take(mut self) -> crate::Message {
142        self.message
143            .take()
144            .expect("pooled message should always contain a message")
145    }
146
147    /// Get a reference to the message
148    #[inline]
149    pub fn get(&self) -> &crate::Message {
150        self.message
151            .as_ref()
152            .expect("pooled message should always contain a message")
153    }
154
155    /// Get a mutable reference to the message
156    #[inline]
157    pub fn get_mut(&mut self) -> &mut crate::Message {
158        self.message
159            .as_mut()
160            .expect("pooled message should always contain a message")
161    }
162}
163
164impl std::ops::Deref for PooledMessage {
165    type Target = crate::Message;
166
167    fn deref(&self) -> &Self::Target {
168        self.message
169            .as_ref()
170            .expect("pooled message should always contain a message")
171    }
172}
173
174impl std::ops::DerefMut for PooledMessage {
175    fn deref_mut(&mut self) -> &mut Self::Target {
176        self.message
177            .as_mut()
178            .expect("pooled message should always contain a message")
179    }
180}
181
182impl Drop for PooledMessage {
183    fn drop(&mut self) {
184        if let Some(msg) = self.message.take() {
185            self.pool.release(msg);
186        }
187    }
188}
189
190/// A pool of reusable task arguments
191#[derive(Debug, Clone)]
192pub struct TaskArgsPool {
193    inner: Arc<Mutex<Vec<crate::TaskArgs>>>,
194    max_size: usize,
195}
196
197impl TaskArgsPool {
198    /// Create a new task args pool with default capacity (1000)
199    pub fn new() -> Self {
200        Self::with_capacity(1000)
201    }
202
203    /// Create a new task args pool with specified maximum capacity
204    pub fn with_capacity(max_size: usize) -> Self {
205        Self {
206            inner: Arc::new(Mutex::new(Vec::new())),
207            max_size,
208        }
209    }
210
211    /// Acquire task arguments from the pool
212    pub fn acquire(&self) -> PooledTaskArgs {
213        let args = {
214            let mut pool = self.inner.lock().expect("lock should not be poisoned");
215            pool.pop()
216        };
217
218        let mut args = args.unwrap_or_else(crate::TaskArgs::new);
219
220        // Clear for reuse
221        args.args.clear();
222        args.kwargs.clear();
223
224        PooledTaskArgs {
225            args: Some(args),
226            pool: self.clone(),
227        }
228    }
229
230    /// Return task arguments to the pool
231    fn release(&self, args: crate::TaskArgs) {
232        let mut pool = self.inner.lock().expect("lock should not be poisoned");
233        if pool.len() < self.max_size {
234            pool.push(args);
235        }
236    }
237
238    /// Get the current number of task args in the pool
239    #[inline]
240    pub fn size(&self) -> usize {
241        self.inner
242            .lock()
243            .expect("lock should not be poisoned")
244            .len()
245    }
246
247    /// Get the maximum pool size
248    #[inline]
249    pub fn max_size(&self) -> usize {
250        self.max_size
251    }
252
253    /// Clear the pool
254    pub fn clear(&self) {
255        self.inner
256            .lock()
257            .expect("lock should not be poisoned")
258            .clear();
259    }
260}
261
262impl Default for TaskArgsPool {
263    fn default() -> Self {
264        Self::new()
265    }
266}
267
268/// Pooled task arguments
269pub struct PooledTaskArgs {
270    args: Option<crate::TaskArgs>,
271    pool: TaskArgsPool,
272}
273
274impl PooledTaskArgs {
275    /// Take ownership, removing from pool management
276    pub fn take(mut self) -> crate::TaskArgs {
277        self.args
278            .take()
279            .expect("pooled task args should always contain args")
280    }
281
282    /// Get a reference
283    #[inline]
284    pub fn get(&self) -> &crate::TaskArgs {
285        self.args
286            .as_ref()
287            .expect("pooled task args should always contain args")
288    }
289
290    /// Get a mutable reference
291    #[inline]
292    pub fn get_mut(&mut self) -> &mut crate::TaskArgs {
293        self.args
294            .as_mut()
295            .expect("pooled task args should always contain args")
296    }
297}
298
299impl std::ops::Deref for PooledTaskArgs {
300    type Target = crate::TaskArgs;
301
302    fn deref(&self) -> &Self::Target {
303        self.args
304            .as_ref()
305            .expect("pooled task args should always contain args")
306    }
307}
308
309impl std::ops::DerefMut for PooledTaskArgs {
310    fn deref_mut(&mut self) -> &mut Self::Target {
311        self.args
312            .as_mut()
313            .expect("pooled task args should always contain args")
314    }
315}
316
317impl Drop for PooledTaskArgs {
318    fn drop(&mut self) {
319        if let Some(args) = self.args.take() {
320            self.pool.release(args);
321        }
322    }
323}
324
325#[cfg(test)]
326mod tests {
327    use super::*;
328
329    #[test]
330    fn test_message_pool_basic() {
331        let pool = MessagePool::new();
332        assert_eq!(pool.size(), 0);
333
334        let msg = pool.acquire();
335        assert_eq!(pool.size(), 0); // Acquired, not in pool
336
337        drop(msg);
338        assert_eq!(pool.size(), 1); // Returned to pool
339    }
340
341    #[test]
342    fn test_message_pool_reuse() {
343        let pool = MessagePool::new();
344
345        // First acquire creates new
346        let mut msg1 = pool.acquire();
347        msg1.headers.task = "test".to_string();
348        drop(msg1);
349
350        // Second acquire reuses
351        let msg2 = pool.acquire();
352        assert_eq!(msg2.headers.task, ""); // Cleared
353        assert_eq!(pool.size(), 0);
354    }
355
356    #[test]
357    fn test_message_pool_max_size() {
358        let pool = MessagePool::with_capacity(2);
359
360        let msg1 = pool.acquire();
361        let msg2 = pool.acquire();
362        let msg3 = pool.acquire();
363
364        drop(msg1);
365        drop(msg2);
366        drop(msg3);
367
368        // Only 2 should be retained (max_size = 2)
369        assert_eq!(pool.size(), 2);
370    }
371
372    #[test]
373    fn test_pooled_message_deref() {
374        let pool = MessagePool::new();
375        let mut msg = pool.acquire();
376
377        msg.headers.task = "tasks.test".to_string();
378        assert_eq!(msg.headers.task, "tasks.test");
379    }
380
381    #[test]
382    fn test_pooled_message_take() {
383        let pool = MessagePool::new();
384        let mut msg = pool.acquire();
385        msg.headers.task = "tasks.test".to_string();
386
387        let owned = msg.take();
388        assert_eq!(owned.headers.task, "tasks.test");
389
390        // Message was taken, not returned to pool
391        assert_eq!(pool.size(), 0);
392    }
393
394    #[test]
395    fn test_task_args_pool_basic() {
396        let pool = TaskArgsPool::new();
397        assert_eq!(pool.size(), 0);
398
399        let args = pool.acquire();
400        assert_eq!(pool.size(), 0);
401
402        drop(args);
403        assert_eq!(pool.size(), 1);
404    }
405
406    #[test]
407    fn test_task_args_pool_reuse() {
408        let pool = TaskArgsPool::new();
409
410        {
411            let mut args1 = pool.acquire();
412            args1.get_mut().args.push(serde_json::json!(42));
413        }
414
415        let args2 = pool.acquire();
416        assert_eq!(args2.get().args.len(), 0); // Cleared
417    }
418
419    #[test]
420    fn test_task_args_pool_deref() {
421        let pool = TaskArgsPool::new();
422        let mut args = pool.acquire();
423
424        args.get_mut().args.push(serde_json::json!(1));
425        assert_eq!(args.get().args.len(), 1);
426    }
427
428    #[test]
429    fn test_task_args_pool_take() {
430        let pool = TaskArgsPool::new();
431        let mut args = pool.acquire();
432        args.get_mut().args.push(serde_json::json!(1));
433
434        let owned = args.take();
435        assert_eq!(owned.args.len(), 1);
436
437        assert_eq!(pool.size(), 0);
438    }
439
440    #[test]
441    fn test_pool_clear() {
442        let pool = MessagePool::new();
443
444        let msg1 = pool.acquire();
445        let msg2 = pool.acquire();
446        drop(msg1);
447        drop(msg2);
448
449        assert_eq!(pool.size(), 2);
450
451        pool.clear();
452        assert_eq!(pool.size(), 0);
453    }
454}