Skip to main content

zenoh_sync/
object_pool.rs

1//
2// Copyright (c) 2023 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use std::{
15    any::Any,
16    fmt,
17    ops::{Deref, DerefMut, Drop},
18    sync::{Arc, Weak},
19};
20
21use zenoh_buffers::ZSliceBuffer;
22
23use super::LifoQueue;
24
25/// Provides a pool of pre-allocated objects that are automaticlaly reinserted into
26/// the pool when dropped.
27pub struct RecyclingObjectPool<T, F>
28where
29    F: Fn() -> T,
30{
31    inner: Arc<LifoQueue<T>>,
32    f: F,
33}
34
35impl<T, F: Fn() -> T + Clone> Clone for RecyclingObjectPool<T, F> {
36    fn clone(&self) -> Self {
37        Self {
38            inner: self.inner.clone(),
39            f: self.f.clone(),
40        }
41    }
42}
43
44impl<T, F: Fn() -> T> RecyclingObjectPool<T, F> {
45    pub fn new(num: usize, f: F) -> RecyclingObjectPool<T, F> {
46        let inner: Arc<LifoQueue<T>> = Arc::new(LifoQueue::new(num));
47        for _ in 0..num {
48            let obj = (f)();
49            inner.try_push(obj);
50        }
51        RecyclingObjectPool { inner, f }
52    }
53
54    pub fn alloc(&self) -> RecyclingObject<T> {
55        RecyclingObject::new((self.f)(), Weak::new())
56    }
57
58    pub fn try_take(&self) -> Option<RecyclingObject<T>> {
59        self.inner
60            .try_pull()
61            .map(|obj| RecyclingObject::new(obj, Arc::downgrade(&self.inner)))
62    }
63
64    pub fn take(&self) -> RecyclingObject<T> {
65        let obj = self.inner.pull();
66        RecyclingObject::new(obj, Arc::downgrade(&self.inner))
67    }
68}
69
70#[derive(Clone)]
71pub struct RecyclingObject<T> {
72    pool: Weak<LifoQueue<T>>,
73    object: Option<T>,
74}
75
76impl<T> RecyclingObject<T> {
77    pub fn new(obj: T, pool: Weak<LifoQueue<T>>) -> RecyclingObject<T> {
78        RecyclingObject {
79            pool,
80            object: Some(obj),
81        }
82    }
83
84    pub fn recycle(mut self) {
85        if let Some(pool) = self.pool.upgrade() {
86            if let Some(obj) = self.object.take() {
87                pool.push(obj);
88            }
89        }
90    }
91}
92
93impl<T: PartialEq> Eq for RecyclingObject<T> {}
94
95impl<T: PartialEq> PartialEq for RecyclingObject<T> {
96    fn eq(&self, other: &Self) -> bool {
97        self.object == other.object
98    }
99}
100
101impl<T> Deref for RecyclingObject<T> {
102    type Target = T;
103    #[inline]
104    fn deref(&self) -> &Self::Target {
105        self.object.as_ref().unwrap()
106    }
107}
108
109impl<T> DerefMut for RecyclingObject<T> {
110    #[inline]
111    fn deref_mut(&mut self) -> &mut Self::Target {
112        self.object.as_mut().unwrap()
113    }
114}
115
116impl<T> From<T> for RecyclingObject<T> {
117    fn from(obj: T) -> RecyclingObject<T> {
118        RecyclingObject::new(obj, Weak::new())
119    }
120}
121
122impl<T> Drop for RecyclingObject<T> {
123    fn drop(&mut self) {
124        if let Some(pool) = self.pool.upgrade() {
125            if let Some(obj) = self.object.take() {
126                pool.push(obj);
127            }
128        }
129    }
130}
131
132impl<T: fmt::Debug> fmt::Debug for RecyclingObject<T> {
133    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
134        f.debug_struct("").field("inner", &self.object).finish()
135    }
136}
137
138// Buffer impl
139impl AsRef<[u8]> for RecyclingObject<Box<[u8]>> {
140    fn as_ref(&self) -> &[u8] {
141        self.deref()
142    }
143}
144
145impl AsMut<[u8]> for RecyclingObject<Box<[u8]>> {
146    fn as_mut(&mut self) -> &mut [u8] {
147        self.deref_mut()
148    }
149}
150
151impl ZSliceBuffer for RecyclingObject<Box<[u8]>> {
152    fn as_slice(&self) -> &[u8] {
153        self.as_ref()
154    }
155
156    fn as_any(&self) -> &dyn Any {
157        self
158    }
159
160    fn as_any_mut(&mut self) -> &mut dyn Any {
161        self
162    }
163}